soteria/internal/k8s/jobs.go

281 lines
8.2 KiB
Go

package k8s
import (
"context"
"errors"
"fmt"
"sort"
"strings"
"time"
"scm.bstein.dev/bstein/soteria/internal/api"
"scm.bstein.dev/bstein/soteria/internal/config"
batchv1 "k8s.io/api/batch/v1"
corev1 "k8s.io/api/core/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
)
const (
labelAppName = "app.kubernetes.io/name"
labelComponent = "app.kubernetes.io/component"
labelAction = "soteria.bstein.dev/action"
labelPVC = "soteria.bstein.dev/pvc"
annotationResticRepository = "soteria.bstein.dev/restic-repository"
annotationDedupeEnabled = "soteria.bstein.dev/dedupe-enabled"
annotationKeepLast = "soteria.bstein.dev/keep-last"
)
type BackupJobSummary struct {
Name string
Namespace string
PVC string
Repository string
DedupeEnabled bool
KeepLast int
CreatedAt time.Time
CompletionTime time.Time
State string
}
func (c *Client) ListBackupJobs(ctx context.Context, namespace string) ([]BackupJobSummary, error) {
selector := fmt.Sprintf("%s=soteria,%s=backup,%s=backup", labelAppName, labelComponent, labelAction)
jobs, err := c.Clientset.BatchV1().Jobs(namespace).List(ctx, metav1.ListOptions{LabelSelector: selector})
if err != nil {
return nil, fmt.Errorf("list backup jobs for namespace %s: %w", namespace, err)
}
out := make([]BackupJobSummary, 0, len(jobs.Items))
for _, job := range jobs.Items {
pvc := strings.TrimSpace(job.Labels[labelPVC])
if pvc == "" {
continue
}
out = append(out, summarizeBackupJob(job, pvc))
}
sortBackupJobSummaries(out)
return out, nil
}
func (c *Client) ReadBackupJobLog(ctx context.Context, namespace, jobName string) (string, error) {
selector := fmt.Sprintf("job-name=%s", jobName)
pods, err := c.Clientset.CoreV1().Pods(namespace).List(ctx, metav1.ListOptions{LabelSelector: selector})
if err != nil {
return "", fmt.Errorf("list pods for backup job %s/%s: %w", namespace, jobName, err)
}
if len(pods.Items) == 0 {
return "", fmt.Errorf("no pod found for backup job %s/%s", namespace, jobName)
}
podName := latestBackupJobPodName(pods.Items)
raw, err := c.Clientset.CoreV1().Pods(namespace).GetLogs(podName, &corev1.PodLogOptions{}).DoRaw(ctx)
if err != nil {
return "", fmt.Errorf("read logs for backup job pod %s/%s: %w", namespace, podName, err)
}
return string(raw), nil
}
func latestBackupJobPodName(pods []corev1.Pod) string {
sort.Slice(pods, func(i, j int) bool {
left := pods[i].Status.StartTime
right := pods[j].Status.StartTime
switch {
case left != nil && right != nil:
if left.Time.Equal(right.Time) {
return pods[i].Name > pods[j].Name
}
return left.Time.After(right.Time)
case left != nil:
return true
case right != nil:
return false
default:
return pods[i].CreationTimestamp.Time.After(pods[j].CreationTimestamp.Time)
}
})
return pods[0].Name
}
func (c *Client) ListBackupJobsForPVC(ctx context.Context, namespace, pvc string) ([]BackupJobSummary, error) {
selector := fmt.Sprintf("%s=soteria,%s=backup,%s=backup,%s=%s", labelAppName, labelComponent, labelAction, labelPVC, pvc)
jobs, err := c.Clientset.BatchV1().Jobs(namespace).List(ctx, metav1.ListOptions{LabelSelector: selector})
if err != nil {
return nil, fmt.Errorf("list backup jobs for pvc %s/%s: %w", namespace, pvc, err)
}
out := make([]BackupJobSummary, 0, len(jobs.Items))
for _, job := range jobs.Items {
out = append(out, summarizeBackupJob(job, pvc))
}
sortBackupJobSummaries(out)
return out, nil
}
func summarizeBackupJob(job batchv1.Job, pvc string) BackupJobSummary {
summary := BackupJobSummary{
Name: job.Name,
Namespace: job.Namespace,
PVC: pvc,
DedupeEnabled: true,
KeepLast: 0,
CreatedAt: job.CreationTimestamp.Time,
State: "Pending",
}
if raw := strings.TrimSpace(job.Annotations[annotationResticRepository]); raw != "" {
summary.Repository = raw
}
summary.DedupeEnabled = parseBoolWithDefault(job.Annotations[annotationDedupeEnabled], true)
summary.KeepLast = parseIntWithDefault(job.Annotations[annotationKeepLast], 0)
if job.Status.CompletionTime != nil {
summary.CompletionTime = job.Status.CompletionTime.Time
}
switch {
case job.Status.Succeeded > 0:
summary.State = "Completed"
case job.Status.Failed > 0:
summary.State = "Failed"
case job.Status.Active > 0:
summary.State = "Running"
}
return summary
}
func sortBackupJobSummaries(items []BackupJobSummary) {
sort.Slice(items, func(i, j int) bool {
left := items[i].CompletionTime
if left.IsZero() {
left = items[i].CreatedAt
}
right := items[j].CompletionTime
if right.IsZero() {
right = items[j].CreatedAt
}
if left.Equal(right) {
return items[i].Name > items[j].Name
}
return left.After(right)
})
}
func (c *Client) CreateBackupJob(ctx context.Context, cfg *config.Config, req api.BackupRequest) (string, string, error) {
if req.Namespace == "" {
return "", "", errors.New("namespace is required")
}
if req.PVC == "" {
return "", "", errors.New("pvc is required")
}
if req.Snapshot {
return "", "", errors.New("snapshot support is not implemented yet")
}
jobName := jobName("backup", req.PVC)
secretName := fmt.Sprintf("soteria-%s-restic", jobName)
dedupeEnabled := dedupeEnabled(req.Dedupe)
keepLast := keepLastWithDefault(req.KeepLast)
repository := resticRepositoryForBackup(cfg.ResticRepository, req.Namespace, req.PVC, dedupeEnabled)
if req.DryRun {
return jobName, secretName, nil
}
secret, err := c.copySecret(ctx, cfg.SecretNamespace, cfg.ResticSecretName, req.Namespace, secretName, map[string]string{
labelAppName: "soteria",
labelComponent: "restic",
labelAction: "backup",
labelPVC: req.PVC,
})
if err != nil {
return "", "", err
}
job := buildBackupJob(cfg, req, jobName, secretName, repository, dedupeEnabled, keepLast)
if nodeName, err := c.resolvePVCMountedNode(ctx, req.Namespace, req.PVC); err == nil && nodeName != "" {
job.Spec.Template.Spec.NodeName = nodeName
}
created, err := c.Clientset.BatchV1().Jobs(req.Namespace).Create(ctx, job, metav1.CreateOptions{})
if err != nil {
_ = c.Clientset.CoreV1().Secrets(req.Namespace).Delete(ctx, secretName, metav1.DeleteOptions{})
return "", "", err
}
if err := c.bindSecretToJob(ctx, req.Namespace, secret.Name, created); err != nil {
return jobName, secretName, err
}
return jobName, secretName, nil
}
func (c *Client) resolvePVCMountedNode(ctx context.Context, namespace, pvc string) (string, error) {
pods, err := c.Clientset.CoreV1().Pods(namespace).List(ctx, metav1.ListOptions{})
if err != nil {
return "", err
}
for _, pod := range pods.Items {
if pod.DeletionTimestamp != nil || pod.Spec.NodeName == "" {
continue
}
if pod.Status.Phase == corev1.PodSucceeded || pod.Status.Phase == corev1.PodFailed {
continue
}
for _, volume := range pod.Spec.Volumes {
claim := volume.PersistentVolumeClaim
if claim == nil {
continue
}
if claim.ClaimName == pvc {
return pod.Spec.NodeName, nil
}
}
}
return "", nil
}
func (c *Client) CreateRestoreJob(ctx context.Context, cfg *config.Config, req api.RestoreTestRequest) (string, string, error) {
if req.Namespace == "" {
return "", "", errors.New("namespace is required")
}
snapshot := req.Snapshot
if snapshot == "" {
snapshot = "latest"
}
jobName := jobName("restore", snapshot)
secretName := fmt.Sprintf("soteria-%s-restic", jobName)
repository := strings.TrimSpace(req.Repository)
if repository == "" {
repository = cfg.ResticRepository
}
if req.DryRun {
return jobName, secretName, nil
}
secret, err := c.copySecret(ctx, cfg.SecretNamespace, cfg.ResticSecretName, req.Namespace, secretName, map[string]string{
labelAppName: "soteria",
labelComponent: "restic",
labelAction: "restore",
})
if err != nil {
return "", "", err
}
job := buildRestoreJob(cfg, req, jobName, secretName, snapshot, repository)
created, err := c.Clientset.BatchV1().Jobs(req.Namespace).Create(ctx, job, metav1.CreateOptions{})
if err != nil {
_ = c.Clientset.CoreV1().Secrets(req.Namespace).Delete(ctx, secretName, metav1.DeleteOptions{})
return "", "", err
}
if err := c.bindSecretToJob(ctx, req.Namespace, secret.Name, created); err != nil {
return jobName, secretName, err
}
return jobName, secretName, nil
}