package k8s import ( "context" "errors" "fmt" "sort" "strconv" "strings" "time" "scm.bstein.dev/bstein/soteria/internal/api" "scm.bstein.dev/bstein/soteria/internal/config" batchv1 "k8s.io/api/batch/v1" corev1 "k8s.io/api/core/v1" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" ) const ( labelAppName = "app.kubernetes.io/name" labelComponent = "app.kubernetes.io/component" labelAction = "soteria.bstein.dev/action" labelPVC = "soteria.bstein.dev/pvc" annotationResticRepository = "soteria.bstein.dev/restic-repository" annotationDedupeEnabled = "soteria.bstein.dev/dedupe-enabled" annotationKeepLast = "soteria.bstein.dev/keep-last" ) type BackupJobSummary struct { Name string Namespace string PVC string Repository string DedupeEnabled bool KeepLast int CreatedAt time.Time CompletionTime time.Time State string } func (c *Client) ListBackupJobs(ctx context.Context, namespace string) ([]BackupJobSummary, error) { selector := fmt.Sprintf("%s=soteria,%s=backup,%s=backup", labelAppName, labelComponent, labelAction) jobs, err := c.Clientset.BatchV1().Jobs(namespace).List(ctx, metav1.ListOptions{LabelSelector: selector}) if err != nil { return nil, fmt.Errorf("list backup jobs for namespace %s: %w", namespace, err) } out := make([]BackupJobSummary, 0, len(jobs.Items)) for _, job := range jobs.Items { pvc := strings.TrimSpace(job.Labels[labelPVC]) if pvc == "" { continue } out = append(out, summarizeBackupJob(job, pvc)) } sortBackupJobSummaries(out) return out, nil } func (c *Client) ReadBackupJobLog(ctx context.Context, namespace, jobName string) (string, error) { selector := fmt.Sprintf("job-name=%s", jobName) pods, err := c.Clientset.CoreV1().Pods(namespace).List(ctx, metav1.ListOptions{LabelSelector: selector}) if err != nil { return "", fmt.Errorf("list pods for backup job %s/%s: %w", namespace, jobName, err) } if len(pods.Items) == 0 { return "", fmt.Errorf("no pod found for backup job %s/%s", namespace, jobName) } sort.Slice(pods.Items, func(i, j int) bool { left := pods.Items[i].Status.StartTime right := pods.Items[j].Status.StartTime switch { case left != nil && right != nil: if left.Time.Equal(right.Time) { return pods.Items[i].Name > pods.Items[j].Name } return left.Time.After(right.Time) case left != nil: return true case right != nil: return false default: return pods.Items[i].CreationTimestamp.Time.After(pods.Items[j].CreationTimestamp.Time) } }) podName := pods.Items[0].Name raw, err := c.Clientset.CoreV1().Pods(namespace).GetLogs(podName, &corev1.PodLogOptions{}).DoRaw(ctx) if err != nil { return "", fmt.Errorf("read logs for backup job pod %s/%s: %w", namespace, podName, err) } return string(raw), nil } func (c *Client) ListBackupJobsForPVC(ctx context.Context, namespace, pvc string) ([]BackupJobSummary, error) { selector := fmt.Sprintf("%s=soteria,%s=backup,%s=backup,%s=%s", labelAppName, labelComponent, labelAction, labelPVC, pvc) jobs, err := c.Clientset.BatchV1().Jobs(namespace).List(ctx, metav1.ListOptions{LabelSelector: selector}) if err != nil { return nil, fmt.Errorf("list backup jobs for pvc %s/%s: %w", namespace, pvc, err) } out := make([]BackupJobSummary, 0, len(jobs.Items)) for _, job := range jobs.Items { out = append(out, summarizeBackupJob(job, pvc)) } sortBackupJobSummaries(out) return out, nil } func summarizeBackupJob(job batchv1.Job, pvc string) BackupJobSummary { summary := BackupJobSummary{ Name: job.Name, Namespace: job.Namespace, PVC: pvc, DedupeEnabled: true, KeepLast: 0, CreatedAt: job.CreationTimestamp.Time, State: "Pending", } if raw := strings.TrimSpace(job.Annotations[annotationResticRepository]); raw != "" { summary.Repository = raw } summary.DedupeEnabled = parseBoolWithDefault(job.Annotations[annotationDedupeEnabled], true) summary.KeepLast = parseIntWithDefault(job.Annotations[annotationKeepLast], 0) if job.Status.CompletionTime != nil { summary.CompletionTime = job.Status.CompletionTime.Time } switch { case job.Status.Succeeded > 0: summary.State = "Completed" case job.Status.Failed > 0: summary.State = "Failed" case job.Status.Active > 0: summary.State = "Running" } return summary } func sortBackupJobSummaries(items []BackupJobSummary) { sort.Slice(items, func(i, j int) bool { left := items[i].CompletionTime if left.IsZero() { left = items[i].CreatedAt } right := items[j].CompletionTime if right.IsZero() { right = items[j].CreatedAt } if left.Equal(right) { return items[i].Name > items[j].Name } return left.After(right) }) } func (c *Client) CreateBackupJob(ctx context.Context, cfg *config.Config, req api.BackupRequest) (string, string, error) { if req.Namespace == "" { return "", "", errors.New("namespace is required") } if req.PVC == "" { return "", "", errors.New("pvc is required") } if req.Snapshot { return "", "", errors.New("snapshot support is not implemented yet") } jobName := jobName("backup", req.PVC) secretName := fmt.Sprintf("soteria-%s-restic", jobName) dedupeEnabled := dedupeEnabled(req.Dedupe) keepLast := keepLastWithDefault(req.KeepLast) repository := resticRepositoryForBackup(cfg.ResticRepository, req.Namespace, req.PVC, dedupeEnabled) if req.DryRun { return jobName, secretName, nil } secret, err := c.copySecret(ctx, cfg.SecretNamespace, cfg.ResticSecretName, req.Namespace, secretName, map[string]string{ labelAppName: "soteria", labelComponent: "restic", labelAction: "backup", labelPVC: req.PVC, }) if err != nil { return "", "", err } job := buildBackupJob(cfg, req, jobName, secretName, repository, dedupeEnabled, keepLast) if nodeName, err := c.resolvePVCMountedNode(ctx, req.Namespace, req.PVC); err == nil && nodeName != "" { job.Spec.Template.Spec.NodeName = nodeName } created, err := c.Clientset.BatchV1().Jobs(req.Namespace).Create(ctx, job, metav1.CreateOptions{}) if err != nil { _ = c.Clientset.CoreV1().Secrets(req.Namespace).Delete(ctx, secretName, metav1.DeleteOptions{}) return "", "", err } if err := c.bindSecretToJob(ctx, req.Namespace, secret.Name, created); err != nil { return jobName, secretName, err } return jobName, secretName, nil } func (c *Client) resolvePVCMountedNode(ctx context.Context, namespace, pvc string) (string, error) { pods, err := c.Clientset.CoreV1().Pods(namespace).List(ctx, metav1.ListOptions{}) if err != nil { return "", err } for _, pod := range pods.Items { if pod.DeletionTimestamp != nil || pod.Spec.NodeName == "" { continue } if pod.Status.Phase == corev1.PodSucceeded || pod.Status.Phase == corev1.PodFailed { continue } for _, volume := range pod.Spec.Volumes { claim := volume.PersistentVolumeClaim if claim == nil { continue } if claim.ClaimName == pvc { return pod.Spec.NodeName, nil } } } return "", nil } func (c *Client) CreateRestoreJob(ctx context.Context, cfg *config.Config, req api.RestoreTestRequest) (string, string, error) { if req.Namespace == "" { return "", "", errors.New("namespace is required") } snapshot := req.Snapshot if snapshot == "" { snapshot = "latest" } jobName := jobName("restore", snapshot) secretName := fmt.Sprintf("soteria-%s-restic", jobName) repository := strings.TrimSpace(req.Repository) if repository == "" { repository = cfg.ResticRepository } if req.DryRun { return jobName, secretName, nil } secret, err := c.copySecret(ctx, cfg.SecretNamespace, cfg.ResticSecretName, req.Namespace, secretName, map[string]string{ labelAppName: "soteria", labelComponent: "restic", labelAction: "restore", }) if err != nil { return "", "", err } job := buildRestoreJob(cfg, req, jobName, secretName, snapshot, repository) created, err := c.Clientset.BatchV1().Jobs(req.Namespace).Create(ctx, job, metav1.CreateOptions{}) if err != nil { _ = c.Clientset.CoreV1().Secrets(req.Namespace).Delete(ctx, secretName, metav1.DeleteOptions{}) return "", "", err } if err := c.bindSecretToJob(ctx, req.Namespace, secret.Name, created); err != nil { return jobName, secretName, err } return jobName, secretName, nil } func buildBackupJob(cfg *config.Config, req api.BackupRequest, jobName, secretName, repository string, dedupeEnabled bool, keepLast int) *batchv1.Job { labels := map[string]string{ labelAppName: "soteria", labelComponent: "backup", labelAction: "backup", labelPVC: req.PVC, } annotations := map[string]string{ annotationResticRepository: repository, annotationDedupeEnabled: strconv.FormatBool(dedupeEnabled), annotationKeepLast: strconv.Itoa(keepLast), } command := backupCommand(cfg, req) pod := corev1.PodSpec{ RestartPolicy: corev1.RestartPolicyNever, Containers: []corev1.Container{ { Name: "restic", Image: cfg.ResticImage, ImagePullPolicy: corev1.PullIfNotPresent, Command: []string{"/bin/sh", "-c"}, Args: []string{command}, Env: resticEnv(cfg, secretName, repository), VolumeMounts: []corev1.VolumeMount{ {Name: "data", MountPath: "/data", ReadOnly: true}, {Name: "cache", MountPath: "/cache"}, }, }, }, Volumes: []corev1.Volume{ { Name: "data", VolumeSource: corev1.VolumeSource{ PersistentVolumeClaim: &corev1.PersistentVolumeClaimVolumeSource{ ClaimName: req.PVC, ReadOnly: true, }, }, }, { Name: "cache", VolumeSource: corev1.VolumeSource{ EmptyDir: &corev1.EmptyDirVolumeSource{}, }, }, }, } if len(cfg.JobNodeSelector) > 0 { pod.NodeSelector = cfg.JobNodeSelector } if cfg.WorkerServiceAccount != "" { pod.ServiceAccountName = cfg.WorkerServiceAccount } return &batchv1.Job{ ObjectMeta: metav1.ObjectMeta{ Name: jobName, Namespace: req.Namespace, Labels: labels, Annotations: annotations, }, Spec: batchv1.JobSpec{ BackoffLimit: int32Ptr(0), TTLSecondsAfterFinished: int32Ptr(cfg.JobTTLSeconds), Template: corev1.PodTemplateSpec{ ObjectMeta: metav1.ObjectMeta{Labels: labels, Annotations: annotations}, Spec: pod, }, }, } } func buildRestoreJob(cfg *config.Config, req api.RestoreTestRequest, jobName, secretName, snapshot, repository string) *batchv1.Job { labels := map[string]string{ labelAppName: "soteria", labelComponent: "restore", labelAction: "restore", } annotations := map[string]string{ annotationResticRepository: repository, } command := restoreCommand(snapshot) pod := corev1.PodSpec{ RestartPolicy: corev1.RestartPolicyNever, Containers: []corev1.Container{ { Name: "restic", Image: cfg.ResticImage, ImagePullPolicy: corev1.PullIfNotPresent, Command: []string{"/bin/sh", "-c"}, Args: []string{command}, Env: resticEnv(cfg, secretName, repository), VolumeMounts: []corev1.VolumeMount{ {Name: "restore", MountPath: "/restore"}, {Name: "cache", MountPath: "/cache"}, }, }, }, Volumes: []corev1.Volume{ { Name: "restore", VolumeSource: corev1.VolumeSource{ EmptyDir: &corev1.EmptyDirVolumeSource{}, }, }, { Name: "cache", VolumeSource: corev1.VolumeSource{EmptyDir: &corev1.EmptyDirVolumeSource{}}, }, }, } if len(cfg.JobNodeSelector) > 0 { pod.NodeSelector = cfg.JobNodeSelector } if req.TargetPVC != "" { pod.Volumes[0] = corev1.Volume{ Name: "restore", VolumeSource: corev1.VolumeSource{ PersistentVolumeClaim: &corev1.PersistentVolumeClaimVolumeSource{ ClaimName: req.TargetPVC, ReadOnly: false, }, }, } labels[labelPVC] = req.TargetPVC } if cfg.WorkerServiceAccount != "" { pod.ServiceAccountName = cfg.WorkerServiceAccount } return &batchv1.Job{ ObjectMeta: metav1.ObjectMeta{ Name: jobName, Namespace: req.Namespace, Labels: labels, Annotations: annotations, }, Spec: batchv1.JobSpec{ BackoffLimit: int32Ptr(0), TTLSecondsAfterFinished: int32Ptr(cfg.JobTTLSeconds), Template: corev1.PodTemplateSpec{ ObjectMeta: metav1.ObjectMeta{Labels: labels, Annotations: annotations}, Spec: pod, }, }, } } func backupCommand(cfg *config.Config, req api.BackupRequest) string { mode := "on" if !dedupeEnabled(req.Dedupe) { mode = "off" } keepLast := keepLastWithDefault(req.KeepLast) args := []string{ "restic", "backup", "/data", "--host", "soteria", "--tag", "soteria", "--tag", fmt.Sprintf("pvc=%s", req.PVC), "--tag", fmt.Sprintf("dedupe=%s", mode), } for _, tag := range req.Tags { tag = strings.TrimSpace(tag) if tag == "" { continue } args = append(args, "--tag", tag) } args = append(args, cfg.ResticBackupArgs...) cmd := strings.Join(args, " ") if keepLast > 0 { forget := strings.Join([]string{ "restic", "forget", "--host", "soteria", "--group-by", "host,tags", "--tag", fmt.Sprintf("pvc=%s", req.PVC), "--keep-last", strconv.Itoa(keepLast), "--prune", }, " ") cmd = fmt.Sprintf("%s && %s", cmd, forget) } else if len(cfg.ResticForgetArgs) > 0 { forget := strings.Join(append([]string{"restic", "forget"}, cfg.ResticForgetArgs...), " ") cmd = fmt.Sprintf("%s && %s", cmd, forget) } bootstrap := "restic snapshots >/dev/null 2>&1 || restic init" return "set -euo pipefail; " + bootstrap + "; " + cmd } func restoreCommand(snapshot string) string { return fmt.Sprintf( "set -euo pipefail; rm -rf /cache/restore && mkdir -p /cache/restore; restic restore %s --target /cache/restore; if [ -d /cache/restore/data ]; then cp -a /cache/restore/data/. /restore/; else cp -a /cache/restore/. /restore/; fi", snapshot, ) } func resticEnv(cfg *config.Config, secretName, repository string) []corev1.EnvVar { if strings.TrimSpace(repository) == "" { repository = cfg.ResticRepository } env := []corev1.EnvVar{ {Name: "RESTIC_REPOSITORY", Value: repository}, {Name: "RESTIC_CACHE_DIR", Value: "/cache"}, { Name: "AWS_ACCESS_KEY_ID", ValueFrom: &corev1.EnvVarSource{SecretKeyRef: &corev1.SecretKeySelector{LocalObjectReference: corev1.LocalObjectReference{Name: secretName}, Key: "AWS_ACCESS_KEY_ID"}}, }, { Name: "AWS_SECRET_ACCESS_KEY", ValueFrom: &corev1.EnvVarSource{SecretKeyRef: &corev1.SecretKeySelector{LocalObjectReference: corev1.LocalObjectReference{Name: secretName}, Key: "AWS_SECRET_ACCESS_KEY"}}, }, { Name: "RESTIC_PASSWORD", ValueFrom: &corev1.EnvVarSource{SecretKeyRef: &corev1.SecretKeySelector{LocalObjectReference: corev1.LocalObjectReference{Name: secretName}, Key: "RESTIC_PASSWORD"}}, }, } if cfg.S3Endpoint != "" { env = append(env, corev1.EnvVar{Name: "RESTIC_S3_ENDPOINT", Value: cfg.S3Endpoint}) env = append(env, corev1.EnvVar{Name: "AWS_ENDPOINT", Value: cfg.S3Endpoint}) } if cfg.S3Region != "" { env = append(env, corev1.EnvVar{Name: "AWS_REGION", Value: cfg.S3Region}) env = append(env, corev1.EnvVar{Name: "AWS_DEFAULT_REGION", Value: cfg.S3Region}) } return env } func (c *Client) copySecret(ctx context.Context, srcNS, srcName, dstNS, dstName string, labels map[string]string) (*corev1.Secret, error) { secret, err := c.Clientset.CoreV1().Secrets(srcNS).Get(ctx, srcName, metav1.GetOptions{}) if err != nil { return nil, fmt.Errorf("read secret %s/%s: %w", srcNS, srcName, err) } copy := &corev1.Secret{ ObjectMeta: metav1.ObjectMeta{ Name: dstName, Namespace: dstNS, Labels: labels, }, Type: secret.Type, Data: secret.Data, } created, err := c.Clientset.CoreV1().Secrets(dstNS).Create(ctx, copy, metav1.CreateOptions{}) if err != nil { return nil, fmt.Errorf("create secret %s/%s: %w", dstNS, dstName, err) } return created, nil } func (c *Client) bindSecretToJob(ctx context.Context, namespace, secretName string, job *batchv1.Job) error { secret, err := c.Clientset.CoreV1().Secrets(namespace).Get(ctx, secretName, metav1.GetOptions{}) if err != nil { return err } controller := true secret.OwnerReferences = append(secret.OwnerReferences, metav1.OwnerReference{ APIVersion: "batch/v1", Kind: "Job", Name: job.Name, UID: job.UID, Controller: &controller, }) _, err = c.Clientset.CoreV1().Secrets(namespace).Update(ctx, secret, metav1.UpdateOptions{}) return err } func jobName(action, suffix string) string { base := sanitizeName(fmt.Sprintf("soteria-%s-%s", action, suffix)) timestamp := time.Now().UTC().Format("20060102-150405") name := fmt.Sprintf("%s-%s", base, timestamp) if len(name) <= 63 { return name } trimmed := base maxBase := 63 - len(timestamp) - 1 if maxBase < 1 { maxBase = 1 } if len(trimmed) > maxBase { trimmed = trimmed[:maxBase] } return fmt.Sprintf("%s-%s", trimmed, timestamp) } func sanitizeName(value string) string { value = strings.ToLower(value) value = strings.ReplaceAll(value, "_", "-") value = strings.ReplaceAll(value, ".", "-") value = strings.ReplaceAll(value, " ", "-") value = strings.Trim(value, "-") return value } func dedupeEnabled(raw *bool) bool { if raw == nil { return true } return *raw } func keepLastWithDefault(raw *int) int { if raw == nil { return 0 } if *raw < 0 { return 0 } return *raw } func resticRepositoryForBackup(base, namespace, pvc string, dedupe bool) string { if dedupe { return strings.TrimSpace(base) } ns := sanitizeRepositorySegment(namespace) pvcName := sanitizeRepositorySegment(pvc) suffix := strings.Trim(strings.Join([]string{"isolated", ns, pvcName}, "/"), "/") return appendRepositoryPath(base, suffix) } func sanitizeRepositorySegment(value string) string { sanitized := sanitizeName(value) if sanitized == "" { return "unknown" } return sanitized } func appendRepositoryPath(base, suffix string) string { base = strings.TrimSpace(base) suffix = strings.Trim(suffix, "/") if base == "" || suffix == "" { return base } backendPrefix := "" location := base if idx := strings.Index(base, ":"); idx > 0 { backendPrefix = base[:idx+1] location = base[idx+1:] } location = strings.TrimRight(location, "/") if location == "" { return base } return backendPrefix + location + "/" + suffix } func parseBoolWithDefault(raw string, fallback bool) bool { value := strings.ToLower(strings.TrimSpace(raw)) if value == "" { return fallback } switch value { case "1", "true", "yes", "on": return true case "0", "false", "no", "off": return false default: return fallback } } func parseIntWithDefault(raw string, fallback int) int { parsed, err := strconv.Atoi(strings.TrimSpace(raw)) if err != nil { return fallback } if parsed < 0 { return fallback } return parsed } func int32Ptr(val int32) *int32 { return &val }