ariadne/ariadne/services/jenkins_workspace_cleanup.py

525 lines
20 KiB
Python

from __future__ import annotations
from dataclasses import dataclass
from datetime import datetime, timedelta, timezone
from typing import Any
from prometheus_client import Counter, Gauge
from ..k8s.client import delete_json, get_json
from ..settings import settings
from ..utils.logging import get_logger
logger = get_logger(__name__)
JENKINS_WORKSPACE_CLEANUP_RUNS_TOTAL = Counter(
"ariadne_jenkins_workspace_cleanup_runs_total",
"Jenkins workspace cleanup runs by status and mode",
["status", "mode"],
)
JENKINS_WORKSPACE_CLEANUP_OBJECTS_TOTAL = Counter(
"ariadne_jenkins_workspace_cleanup_objects_total",
"Jenkins workspace cleanup objects by kind, action, and mode",
["kind", "action", "mode"],
)
JENKINS_WORKSPACE_CLEANUP_LAST_RUN_TS = Gauge(
"ariadne_jenkins_workspace_cleanup_last_run_timestamp_seconds",
"Last Jenkins workspace cleanup run timestamp",
)
JENKINS_WORKSPACE_CLEANUP_LAST_SUCCESS_TS = Gauge(
"ariadne_jenkins_workspace_cleanup_last_success_timestamp_seconds",
"Last successful Jenkins workspace cleanup timestamp",
)
JENKINS_WORKSPACE_CLEANUP_LAST_FAILURE_TS = Gauge(
"ariadne_jenkins_workspace_cleanup_last_failure_timestamp_seconds",
"Last failed Jenkins workspace cleanup timestamp",
)
JENKINS_WORKSPACE_CLEANUP_LAST_DELETED = Gauge(
"ariadne_jenkins_workspace_cleanup_last_deleted_total",
"Last Jenkins workspace cleanup deleted object count",
["kind"],
)
JENKINS_WORKSPACE_CLEANUP_LAST_PLANNED = Gauge(
"ariadne_jenkins_workspace_cleanup_last_planned_total",
"Last Jenkins workspace cleanup planned object count",
["kind"],
)
JENKINS_WORKSPACE_CLEANUP_LAST_SKIPPED = Gauge(
"ariadne_jenkins_workspace_cleanup_last_skipped_total",
"Last Jenkins workspace cleanup skipped object count",
)
JENKINS_WORKSPACE_CLEANUP_LAST_FAILURES = Gauge(
"ariadne_jenkins_workspace_cleanup_last_failures_total",
"Last Jenkins workspace cleanup failure count",
)
@dataclass(frozen=True)
class JenkinsWorkspaceCleanupSummary:
"""Summarize one Jenkins workspace-storage cleanup pass.
Inputs: Kubernetes PV/PVC/Longhorn objects fetched from the API server.
Outputs: deterministic counters for operator logs and metrics.
"""
pvs_planned: int
pvcs_planned: int
volumes_planned: int
pvs_deleted: int
pvcs_deleted: int
volumes_deleted: int
skipped: int
failures: int
dry_run: bool
@property
def planned(self) -> int:
return self.pvs_planned + self.pvcs_planned + self.volumes_planned
@property
def deleted(self) -> int:
return self.pvs_deleted + self.pvcs_deleted + self.volumes_deleted
@dataclass(frozen=True)
class _CleanupCandidate:
name: str
kind: str
path: str
created_at: datetime | None
related_pvc: str | None = None
pv_name: str | None = None
def _parse_timestamp(raw: str) -> datetime | None:
"""Parse Kubernetes RFC3339 timestamps into timezone-aware datetimes."""
normalized = raw.replace("Z", "+00:00")
try:
return datetime.fromisoformat(normalized)
except ValueError:
return None
def _created_at(metadata: dict[str, Any]) -> datetime | None:
raw = metadata.get("creationTimestamp")
if not isinstance(raw, str) or not raw:
return None
return _parse_timestamp(raw)
def _is_old_enough(metadata: dict[str, Any]) -> bool:
"""Return true when an object age exceeds the configured cleanup threshold."""
created_at = _created_at(metadata)
if created_at is None:
return False
min_age = timedelta(hours=settings.jenkins_workspace_cleanup_min_age_hours)
return datetime.now(timezone.utc) - created_at >= min_age
def _is_deleting(metadata: dict[str, Any]) -> bool:
deletion_ts = metadata.get("deletionTimestamp")
return isinstance(deletion_ts, str) and bool(deletion_ts.strip())
def _is_workspace_name(name: Any) -> bool:
return isinstance(name, str) and name.startswith(settings.jenkins_workspace_pvc_prefix)
def _active_workspace_claims() -> set[str]:
"""Collect currently referenced Jenkins workspace PVC names from pods."""
namespace = settings.jenkins_workspace_namespace
payload = get_json(f"/api/v1/namespaces/{namespace}/pods")
items = payload.get("items") if isinstance(payload.get("items"), list) else []
active: set[str] = set()
for pod in items:
if not isinstance(pod, dict):
continue
metadata = pod.get("metadata") if isinstance(pod.get("metadata"), dict) else {}
annotations = metadata.get("annotations") if isinstance(metadata.get("annotations"), dict) else {}
spec = pod.get("spec") if isinstance(pod.get("spec"), dict) else {}
volumes = spec.get("volumes") if isinstance(spec.get("volumes"), list) else []
for volume in volumes:
if not isinstance(volume, dict):
continue
claim = volume.get("persistentVolumeClaim")
if not isinstance(claim, dict):
continue
claim_name = claim.get("claimName")
if _is_workspace_name(claim_name):
active.add(claim_name)
claim_name = annotations.get("jenkins.io/workspace-pvc")
if _is_workspace_name(claim_name):
active.add(claim_name)
return active
def _workspace_pv_candidates(active_claims: set[str]) -> tuple[list[_CleanupCandidate], set[str]]:
"""Find releasable Jenkins workspace PVs and keep a set of all PV names."""
namespace = settings.jenkins_workspace_namespace
payload = get_json("/api/v1/persistentvolumes")
items = payload.get("items") if isinstance(payload.get("items"), list) else []
candidates: list[_CleanupCandidate] = []
all_pv_names: set[str] = set()
for pv in items:
if not isinstance(pv, dict):
continue
metadata = pv.get("metadata") if isinstance(pv.get("metadata"), dict) else {}
status = pv.get("status") if isinstance(pv.get("status"), dict) else {}
spec = pv.get("spec") if isinstance(pv.get("spec"), dict) else {}
name = metadata.get("name")
if isinstance(name, str) and name:
all_pv_names.add(name)
claim_ref = spec.get("claimRef") if isinstance(spec.get("claimRef"), dict) else {}
claim_namespace = claim_ref.get("namespace")
claim_name = claim_ref.get("name")
phase = status.get("phase")
if claim_namespace != namespace:
continue
if not _is_workspace_name(claim_name):
continue
if _is_deleting(metadata):
continue
if claim_name in active_claims:
continue
if phase not in {"Released", "Failed"}:
continue
if not _is_old_enough(metadata):
continue
if not isinstance(name, str) or not name:
continue
candidates.append(
_CleanupCandidate(
name=name,
kind="pv",
path=f"/api/v1/persistentvolumes/{name}",
created_at=_created_at(metadata),
related_pvc=claim_name if isinstance(claim_name, str) else None,
)
)
return candidates, all_pv_names
def _workspace_pvc_candidates(active_claims: set[str]) -> list[_CleanupCandidate]:
"""Find stale Jenkins workspace PVCs that are not actively referenced."""
namespace = settings.jenkins_workspace_namespace
payload = get_json(f"/api/v1/namespaces/{namespace}/persistentvolumeclaims")
items = payload.get("items") if isinstance(payload.get("items"), list) else []
candidates: list[_CleanupCandidate] = []
for pvc in items:
if not isinstance(pvc, dict):
continue
metadata = pvc.get("metadata") if isinstance(pvc.get("metadata"), dict) else {}
status = pvc.get("status") if isinstance(pvc.get("status"), dict) else {}
claim_name = metadata.get("name")
phase = status.get("phase")
if not _is_workspace_name(claim_name):
continue
if _is_deleting(metadata):
continue
if claim_name in active_claims:
continue
if phase == "Bound":
continue
if not _is_old_enough(metadata):
continue
if not isinstance(claim_name, str) or not claim_name:
continue
candidates.append(
_CleanupCandidate(
name=claim_name,
kind="pvc",
path=f"/api/v1/namespaces/{namespace}/persistentvolumeclaims/{claim_name}",
created_at=_created_at(metadata),
)
)
return candidates
def _workspace_longhorn_candidates(all_pv_names: set[str], removed_pv_names: set[str]) -> list[_CleanupCandidate]:
namespace = "longhorn-system"
payload = get_json("/apis/longhorn.io/v1beta2/namespaces/longhorn-system/volumes")
items = payload.get("items") if isinstance(payload.get("items"), list) else []
candidates: list[_CleanupCandidate] = []
for volume in items:
if not isinstance(volume, dict):
continue
metadata = volume.get("metadata") if isinstance(volume.get("metadata"), dict) else {}
status = volume.get("status") if isinstance(volume.get("status"), dict) else {}
spec = volume.get("spec") if isinstance(volume.get("spec"), dict) else {}
name = metadata.get("name")
if not isinstance(name, str) or not name:
continue
labels = metadata.get("labels") if isinstance(metadata.get("labels"), dict) else {}
pvc_name = labels.get("kubernetes.io/created-for/pvc/name")
robust_state = status.get("robustness")
state = status.get("state")
attached = status.get("isAttached")
frontend = spec.get("frontend")
should_delete = False
if name in removed_pv_names:
should_delete = True
elif _is_workspace_name(pvc_name) and name not in all_pv_names:
should_delete = True
if not should_delete:
continue
if _is_deleting(metadata):
continue
if not _is_old_enough(metadata):
continue
if state not in {None, "detached", "faulted", "unknown"}:
continue
if attached is True:
continue
if robust_state not in {None, "unknown", "faulted", "degraded"}:
continue
if frontend not in {None, "", "blockdev"}:
continue
candidates.append(
_CleanupCandidate(
name=name,
kind="longhorn_volume",
path=f"/apis/longhorn.io/v1beta2/namespaces/{namespace}/volumes/{name}",
created_at=_created_at(metadata),
pv_name=name,
)
)
return candidates
def _record_metrics(summary: JenkinsWorkspaceCleanupSummary) -> None:
mode = "dry_run" if summary.dry_run else "delete"
status = "ok" if summary.failures == 0 else "error"
JENKINS_WORKSPACE_CLEANUP_RUNS_TOTAL.labels(status=status, mode=mode).inc()
if summary.failures:
JENKINS_WORKSPACE_CLEANUP_LAST_FAILURE_TS.set(datetime.now(timezone.utc).timestamp())
else:
JENKINS_WORKSPACE_CLEANUP_LAST_SUCCESS_TS.set(datetime.now(timezone.utc).timestamp())
JENKINS_WORKSPACE_CLEANUP_LAST_RUN_TS.set(datetime.now(timezone.utc).timestamp())
JENKINS_WORKSPACE_CLEANUP_LAST_DELETED.labels(kind="pvc").set(summary.pvcs_deleted)
JENKINS_WORKSPACE_CLEANUP_LAST_DELETED.labels(kind="pv").set(summary.pvs_deleted)
JENKINS_WORKSPACE_CLEANUP_LAST_DELETED.labels(kind="longhorn_volume").set(summary.volumes_deleted)
JENKINS_WORKSPACE_CLEANUP_LAST_PLANNED.labels(kind="pvc").set(summary.pvcs_planned)
JENKINS_WORKSPACE_CLEANUP_LAST_PLANNED.labels(kind="pv").set(summary.pvs_planned)
JENKINS_WORKSPACE_CLEANUP_LAST_PLANNED.labels(kind="longhorn_volume").set(summary.volumes_planned)
JENKINS_WORKSPACE_CLEANUP_LAST_SKIPPED.set(summary.skipped)
JENKINS_WORKSPACE_CLEANUP_LAST_FAILURES.set(summary.failures)
for kind, planned, deleted in (
("pvc", summary.pvcs_planned, summary.pvcs_deleted),
("pv", summary.pvs_planned, summary.pvs_deleted),
("longhorn_volume", summary.volumes_planned, summary.volumes_deleted),
):
if planned:
JENKINS_WORKSPACE_CLEANUP_OBJECTS_TOTAL.labels(kind=kind, action="planned", mode=mode).inc(planned)
if deleted:
JENKINS_WORKSPACE_CLEANUP_OBJECTS_TOTAL.labels(kind=kind, action="deleted", mode=mode).inc(deleted)
if summary.skipped:
JENKINS_WORKSPACE_CLEANUP_OBJECTS_TOTAL.labels(
kind="cleanup",
action="skipped",
mode=mode,
).inc(summary.skipped)
if summary.failures:
JENKINS_WORKSPACE_CLEANUP_OBJECTS_TOTAL.labels(
kind="cleanup",
action="failed",
mode=mode,
).inc(summary.failures)
def cleanup_jenkins_workspace_storage() -> JenkinsWorkspaceCleanupSummary:
"""Delete stale Jenkins workspace PVC/PV artifacts and orphan Longhorn volumes."""
namespace = settings.jenkins_workspace_namespace
dry_run = settings.jenkins_workspace_cleanup_dry_run
max_deletions = settings.jenkins_workspace_cleanup_max_deletions_per_run
prefix = settings.jenkins_workspace_pvc_prefix.strip()
pvs_deleted = 0
pvcs_deleted = 0
volumes_deleted = 0
skipped = 0
failures = 0
stale_pvs: list[_CleanupCandidate] = []
stale_pvcs: list[_CleanupCandidate] = []
stale_volumes: list[_CleanupCandidate] = []
summary: JenkinsWorkspaceCleanupSummary
try:
if not namespace.strip():
raise ValueError("jenkins workspace cleanup namespace is empty")
if not prefix:
raise ValueError("jenkins workspace cleanup pvc prefix is empty")
if settings.jenkins_workspace_cleanup_min_age_hours < 1.0:
raise ValueError("jenkins workspace cleanup min age must be >= 1 hour")
if max_deletions < 1:
raise ValueError("jenkins workspace cleanup max deletions must be >= 1")
active_claims = _active_workspace_claims()
stale_pvs, all_pv_names = _workspace_pv_candidates(active_claims)
stale_pvcs = _workspace_pvc_candidates(active_claims)
removed_pv_names: set[str] = set()
stale_volumes = _workspace_longhorn_candidates(all_pv_names, removed_pv_names)
planned_total = len(stale_pvs) + len(stale_pvcs) + len(stale_volumes)
deletion_budget: int | None = None
if dry_run:
logger.info(
"jenkins workspace cleanup dry-run enabled",
extra={
"event": "jenkins_workspace_cleanup",
"status": "dry_run",
"namespace": namespace,
"dry_run": True,
"planned_pvs": len(stale_pvs),
"planned_pvcs": len(stale_pvcs),
"planned_volumes": len(stale_volumes),
"max_deletions": max_deletions,
},
)
else:
deletion_budget = max_deletions
if not dry_run and planned_total > max_deletions:
logger.warning(
"jenkins workspace cleanup capped by max deletions guard",
extra={
"event": "jenkins_workspace_cleanup",
"status": "guard_capped",
"namespace": namespace,
"dry_run": False,
"planned_total": planned_total,
"max_deletions": max_deletions,
"planned_pvs": len(stale_pvs),
"planned_pvcs": len(stale_pvcs),
"planned_volumes": len(stale_volumes),
},
)
for pvc in stale_pvcs:
claim_name = pvc.name
if not claim_name:
skipped += 1
continue
if dry_run:
continue
if deletion_budget is not None and deletion_budget <= 0:
skipped += 1
continue
if deletion_budget is not None:
deletion_budget -= 1
try:
delete_json(pvc.path)
pvcs_deleted += 1
except Exception as exc:
failures += 1
logger.info(
"jenkins workspace pvc delete failed",
extra={"event": "jenkins_workspace_cleanup", "claim": claim_name, "detail": str(exc)},
)
for pv in stale_pvs:
pv_name = pv.name
if not pv_name:
skipped += 1
continue
if dry_run:
continue
if deletion_budget is not None and deletion_budget <= 0:
skipped += 1
continue
if deletion_budget is not None:
deletion_budget -= 1
try:
delete_json(pv.path)
removed_pv_names.add(pv_name)
pvs_deleted += 1
except Exception as exc:
failures += 1
logger.info(
"jenkins workspace pv delete failed",
extra={"event": "jenkins_workspace_cleanup", "pv": pv_name, "detail": str(exc)},
)
# Recompute longhorn candidates using the updated removed PV list.
stale_volumes = _workspace_longhorn_candidates(all_pv_names, removed_pv_names)
for volume in stale_volumes:
if not volume.name:
skipped += 1
continue
if dry_run:
continue
if deletion_budget is not None and deletion_budget <= 0:
skipped += 1
continue
if deletion_budget is not None:
deletion_budget -= 1
try:
delete_json(volume.path)
volumes_deleted += 1
except Exception as exc:
failures += 1
logger.info(
"jenkins workspace longhorn volume delete failed",
extra={"event": "jenkins_workspace_cleanup", "volume": volume.name, "detail": str(exc)},
)
summary = JenkinsWorkspaceCleanupSummary(
pvs_planned=len(stale_pvs),
pvcs_planned=len(stale_pvcs),
volumes_planned=len(stale_volumes),
pvs_deleted=pvs_deleted,
pvcs_deleted=pvcs_deleted,
volumes_deleted=volumes_deleted,
skipped=skipped,
failures=failures,
dry_run=dry_run,
)
except Exception as exc:
failures += 1
logger.exception(
"jenkins workspace cleanup failed",
extra={"event": "jenkins_workspace_cleanup", "status": "error", "namespace": namespace, "detail": str(exc)},
)
summary = JenkinsWorkspaceCleanupSummary(
pvs_planned=len(stale_pvs),
pvcs_planned=len(stale_pvcs),
volumes_planned=len(stale_volumes),
pvs_deleted=pvs_deleted,
pvcs_deleted=pvcs_deleted,
volumes_deleted=volumes_deleted,
skipped=skipped,
failures=failures,
dry_run=dry_run,
)
_record_metrics(summary)
raise
_record_metrics(summary)
logger.info(
"jenkins workspace cleanup finished",
extra={
"event": "jenkins_workspace_cleanup",
"status": "ok" if failures == 0 else "error",
"dry_run": dry_run,
"namespace": namespace,
"planned_pvs": summary.pvs_planned,
"planned_pvcs": summary.pvcs_planned,
"planned_volumes": summary.volumes_planned,
"deleted_pvs": pvs_deleted,
"deleted_pvcs": pvcs_deleted,
"deleted_volumes": volumes_deleted,
"skipped": skipped,
"failures": failures,
},
)
return summary