package k8s import ( "context" "errors" "fmt" "sort" "strings" "time" "scm.bstein.dev/bstein/soteria/internal/api" "scm.bstein.dev/bstein/soteria/internal/config" batchv1 "k8s.io/api/batch/v1" corev1 "k8s.io/api/core/v1" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" ) const ( labelAppName = "app.kubernetes.io/name" labelComponent = "app.kubernetes.io/component" labelAction = "soteria.bstein.dev/action" labelPVC = "soteria.bstein.dev/pvc" annotationResticRepository = "soteria.bstein.dev/restic-repository" annotationDedupeEnabled = "soteria.bstein.dev/dedupe-enabled" annotationKeepLast = "soteria.bstein.dev/keep-last" ) // BackupJobSummary captures the backup job details shown in the UI and metrics. type BackupJobSummary struct { Name string Namespace string PVC string Repository string DedupeEnabled bool KeepLast int CreatedAt time.Time CompletionTime time.Time State string } // ListBackupJobs returns backup jobs in a namespace, newest first. func (c *Client) ListBackupJobs(ctx context.Context, namespace string) ([]BackupJobSummary, error) { selector := fmt.Sprintf("%s=soteria,%s=backup,%s=backup", labelAppName, labelComponent, labelAction) jobs, err := c.Clientset.BatchV1().Jobs(namespace).List(ctx, metav1.ListOptions{LabelSelector: selector}) if err != nil { return nil, fmt.Errorf("list backup jobs for namespace %s: %w", namespace, err) } out := make([]BackupJobSummary, 0, len(jobs.Items)) for _, job := range jobs.Items { pvc := strings.TrimSpace(job.Labels[labelPVC]) if pvc == "" { continue } out = append(out, summarizeBackupJob(job, pvc)) } sortBackupJobSummaries(out) return out, nil } // ReadBackupJobLog fetches the log stream for the most recent pod of a backup job. func (c *Client) ReadBackupJobLog(ctx context.Context, namespace, jobName string) (string, error) { selector := fmt.Sprintf("job-name=%s", jobName) pods, err := c.Clientset.CoreV1().Pods(namespace).List(ctx, metav1.ListOptions{LabelSelector: selector}) if err != nil { return "", fmt.Errorf("list pods for backup job %s/%s: %w", namespace, jobName, err) } if len(pods.Items) == 0 { return "", fmt.Errorf("no pod found for backup job %s/%s", namespace, jobName) } podName := latestBackupJobPodName(pods.Items) raw, err := c.Clientset.CoreV1().Pods(namespace).GetLogs(podName, &corev1.PodLogOptions{}).DoRaw(ctx) if err != nil { return "", fmt.Errorf("read logs for backup job pod %s/%s: %w", namespace, podName, err) } return string(raw), nil } func latestBackupJobPodName(pods []corev1.Pod) string { sort.Slice(pods, func(i, j int) bool { left := pods[i].Status.StartTime right := pods[j].Status.StartTime switch { case left != nil && right != nil: if left.Time.Equal(right.Time) { return pods[i].Name > pods[j].Name } return left.Time.After(right.Time) case left != nil: return true case right != nil: return false default: return pods[i].CreationTimestamp.Time.After(pods[j].CreationTimestamp.Time) } }) return pods[0].Name } // ListBackupJobsForPVC returns backup jobs for a single PVC in newest-first order. func (c *Client) ListBackupJobsForPVC(ctx context.Context, namespace, pvc string) ([]BackupJobSummary, error) { selector := fmt.Sprintf("%s=soteria,%s=backup,%s=backup,%s=%s", labelAppName, labelComponent, labelAction, labelPVC, pvc) jobs, err := c.Clientset.BatchV1().Jobs(namespace).List(ctx, metav1.ListOptions{LabelSelector: selector}) if err != nil { return nil, fmt.Errorf("list backup jobs for pvc %s/%s: %w", namespace, pvc, err) } out := make([]BackupJobSummary, 0, len(jobs.Items)) for _, job := range jobs.Items { out = append(out, summarizeBackupJob(job, pvc)) } sortBackupJobSummaries(out) return out, nil } func summarizeBackupJob(job batchv1.Job, pvc string) BackupJobSummary { summary := BackupJobSummary{ Name: job.Name, Namespace: job.Namespace, PVC: pvc, DedupeEnabled: true, KeepLast: 0, CreatedAt: job.CreationTimestamp.Time, State: "Pending", } if raw := strings.TrimSpace(job.Annotations[annotationResticRepository]); raw != "" { summary.Repository = raw } summary.DedupeEnabled = parseBoolWithDefault(job.Annotations[annotationDedupeEnabled], true) summary.KeepLast = parseIntWithDefault(job.Annotations[annotationKeepLast], 0) if job.Status.CompletionTime != nil { summary.CompletionTime = job.Status.CompletionTime.Time } switch { case job.Status.Succeeded > 0: summary.State = "Completed" case job.Status.Failed > 0: summary.State = "Failed" case job.Status.Active > 0: summary.State = "Running" } return summary } func sortBackupJobSummaries(items []BackupJobSummary) { sort.Slice(items, func(i, j int) bool { left := items[i].CompletionTime if left.IsZero() { left = items[i].CreatedAt } right := items[j].CompletionTime if right.IsZero() { right = items[j].CreatedAt } if left.Equal(right) { return items[i].Name > items[j].Name } return left.After(right) }) } // CreateBackupJob provisions the Kubernetes job and companion secret for a backup request. func (c *Client) CreateBackupJob(ctx context.Context, cfg *config.Config, req api.BackupRequest) (string, string, error) { if req.Namespace == "" { return "", "", errors.New("namespace is required") } if req.PVC == "" { return "", "", errors.New("pvc is required") } if req.Snapshot { return "", "", errors.New("snapshot support is not implemented yet") } jobName := jobName("backup", req.PVC) secretName := fmt.Sprintf("soteria-%s-restic", jobName) dedupeEnabled := dedupeEnabled(req.Dedupe) keepLast := keepLastWithDefault(req.KeepLast) repository := resticRepositoryForBackup(cfg.ResticRepository, req.Namespace, req.PVC, dedupeEnabled) if req.DryRun { return jobName, secretName, nil } secret, err := c.copySecret(ctx, cfg.SecretNamespace, cfg.ResticSecretName, req.Namespace, secretName, map[string]string{ labelAppName: "soteria", labelComponent: "restic", labelAction: "backup", labelPVC: req.PVC, }) if err != nil { return "", "", err } job := buildBackupJob(cfg, req, jobName, secretName, repository, dedupeEnabled, keepLast) if nodeName, err := c.resolvePVCMountedNode(ctx, req.Namespace, req.PVC); err == nil && nodeName != "" { job.Spec.Template.Spec.NodeName = nodeName } created, err := c.Clientset.BatchV1().Jobs(req.Namespace).Create(ctx, job, metav1.CreateOptions{}) if err != nil { _ = c.Clientset.CoreV1().Secrets(req.Namespace).Delete(ctx, secretName, metav1.DeleteOptions{}) return "", "", err } if err := c.bindSecretToJob(ctx, req.Namespace, secret.Name, created); err != nil { return jobName, secretName, err } return jobName, secretName, nil } func (c *Client) resolvePVCMountedNode(ctx context.Context, namespace, pvc string) (string, error) { pods, err := c.Clientset.CoreV1().Pods(namespace).List(ctx, metav1.ListOptions{}) if err != nil { return "", err } for _, pod := range pods.Items { if pod.DeletionTimestamp != nil || pod.Spec.NodeName == "" { continue } if pod.Status.Phase == corev1.PodSucceeded || pod.Status.Phase == corev1.PodFailed { continue } for _, volume := range pod.Spec.Volumes { claim := volume.PersistentVolumeClaim if claim == nil { continue } if claim.ClaimName == pvc { return pod.Spec.NodeName, nil } } } return "", nil } // CreateRestoreJob provisions the Kubernetes job and companion secret for a restore request. func (c *Client) CreateRestoreJob(ctx context.Context, cfg *config.Config, req api.RestoreTestRequest) (string, string, error) { if req.Namespace == "" { return "", "", errors.New("namespace is required") } snapshot := req.Snapshot if snapshot == "" { snapshot = "latest" } jobName := jobName("restore", snapshot) secretName := fmt.Sprintf("soteria-%s-restic", jobName) repository := strings.TrimSpace(req.Repository) if repository == "" { repository = cfg.ResticRepository } if req.DryRun { return jobName, secretName, nil } secret, err := c.copySecret(ctx, cfg.SecretNamespace, cfg.ResticSecretName, req.Namespace, secretName, map[string]string{ labelAppName: "soteria", labelComponent: "restic", labelAction: "restore", }) if err != nil { return "", "", err } job := buildRestoreJob(cfg, req, jobName, secretName, snapshot, repository) created, err := c.Clientset.BatchV1().Jobs(req.Namespace).Create(ctx, job, metav1.CreateOptions{}) if err != nil { _ = c.Clientset.CoreV1().Secrets(req.Namespace).Delete(ctx, secretName, metav1.DeleteOptions{}) return "", "", err } if err := c.bindSecretToJob(ctx, req.Namespace, secret.Name, created); err != nil { return jobName, secretName, err } return jobName, secretName, nil }