diff --git a/internal/k8s/jobs.go b/internal/k8s/jobs.go index 9ff1cf5..31dc980 100644 --- a/internal/k8s/jobs.go +++ b/internal/k8s/jobs.go @@ -4,6 +4,7 @@ import ( "context" "errors" "fmt" + "sort" "strings" "time" @@ -22,6 +23,61 @@ const ( labelPVC = "soteria.bstein.dev/pvc" ) +type BackupJobSummary struct { + Name string + Namespace string + PVC string + CreatedAt time.Time + CompletionTime time.Time + State string +} + +func (c *Client) ListBackupJobsForPVC(ctx context.Context, namespace, pvc string) ([]BackupJobSummary, error) { + selector := fmt.Sprintf("%s=soteria,%s=backup,%s=backup,%s=%s", labelAppName, labelComponent, labelAction, labelPVC, pvc) + jobs, err := c.Clientset.BatchV1().Jobs(namespace).List(ctx, metav1.ListOptions{LabelSelector: selector}) + if err != nil { + return nil, fmt.Errorf("list backup jobs for pvc %s/%s: %w", namespace, pvc, err) + } + + out := make([]BackupJobSummary, 0, len(jobs.Items)) + for _, job := range jobs.Items { + summary := BackupJobSummary{ + Name: job.Name, + Namespace: job.Namespace, + PVC: pvc, + CreatedAt: job.CreationTimestamp.Time, + State: "Running", + } + if job.Status.CompletionTime != nil { + summary.CompletionTime = job.Status.CompletionTime.Time + } + switch { + case job.Status.Succeeded > 0: + summary.State = "Completed" + case job.Status.Failed > 0: + summary.State = "Failed" + } + out = append(out, summary) + } + + sort.Slice(out, func(i, j int) bool { + left := out[i].CompletionTime + if left.IsZero() { + left = out[i].CreatedAt + } + right := out[j].CompletionTime + if right.IsZero() { + right = out[j].CreatedAt + } + if left.Equal(right) { + return out[i].Name > out[j].Name + } + return left.After(right) + }) + + return out, nil +} + func (c *Client) CreateBackupJob(ctx context.Context, cfg *config.Config, req api.BackupRequest) (string, string, error) { if req.Namespace == "" { return "", "", errors.New("namespace is required") @@ -206,7 +262,7 @@ func buildRestoreJob(cfg *config.Config, req api.RestoreTestRequest, jobName, se }, }, { - Name: "cache", + Name: "cache", VolumeSource: corev1.VolumeSource{EmptyDir: &corev1.EmptyDirVolumeSource{}}, }, }, @@ -266,7 +322,8 @@ func backupCommand(cfg *config.Config, req api.BackupRequest) string { cmd = fmt.Sprintf("%s && %s", cmd, forget) } - return "set -euo pipefail; " + cmd + bootstrap := "restic snapshots >/dev/null 2>&1 || restic init" + return "set -euo pipefail; " + bootstrap + "; " + cmd } func restoreCommand(snapshot string) string { @@ -278,15 +335,15 @@ func resticEnv(cfg *config.Config, secretName string) []corev1.EnvVar { {Name: "RESTIC_REPOSITORY", Value: cfg.ResticRepository}, {Name: "RESTIC_CACHE_DIR", Value: "/cache"}, { - Name: "AWS_ACCESS_KEY_ID", + Name: "AWS_ACCESS_KEY_ID", ValueFrom: &corev1.EnvVarSource{SecretKeyRef: &corev1.SecretKeySelector{LocalObjectReference: corev1.LocalObjectReference{Name: secretName}, Key: "AWS_ACCESS_KEY_ID"}}, }, { - Name: "AWS_SECRET_ACCESS_KEY", + Name: "AWS_SECRET_ACCESS_KEY", ValueFrom: &corev1.EnvVarSource{SecretKeyRef: &corev1.SecretKeySelector{LocalObjectReference: corev1.LocalObjectReference{Name: secretName}, Key: "AWS_SECRET_ACCESS_KEY"}}, }, { - Name: "RESTIC_PASSWORD", + Name: "RESTIC_PASSWORD", ValueFrom: &corev1.EnvVarSource{SecretKeyRef: &corev1.SecretKeySelector{LocalObjectReference: corev1.LocalObjectReference{Name: secretName}, Key: "RESTIC_PASSWORD"}}, }, } diff --git a/internal/server/server.go b/internal/server/server.go index c21bd79..3078250 100644 --- a/internal/server/server.go +++ b/internal/server/server.go @@ -28,6 +28,7 @@ type kubeClient interface { ResolvePVCVolume(ctx context.Context, namespace, pvcName string) (string, *corev1.PersistentVolumeClaim, *corev1.PersistentVolume, error) CreateBackupJob(ctx context.Context, cfg *config.Config, req api.BackupRequest) (string, string, error) CreateRestoreJob(ctx context.Context, cfg *config.Config, req api.RestoreTestRequest) (string, string, error) + ListBackupJobsForPVC(ctx context.Context, namespace, pvc string) ([]k8s.BackupJobSummary, error) ListBoundPVCs(ctx context.Context) ([]k8s.PVCSummary, error) PersistentVolumeClaimExists(ctx context.Context, namespace, pvcName string) (bool, error) LoadSecretData(ctx context.Context, namespace, secretName, key string) ([]byte, error) @@ -261,18 +262,34 @@ func (s *Server) handleBackups(w http.ResponseWriter, r *http.Request) { return } - backups, err := s.longhorn.ListBackups(r.Context(), volumeName) - if err != nil { - writeError(w, http.StatusBadGateway, err.Error()) - return + switch s.cfg.BackupDriver { + case "longhorn": + backups, err := s.longhorn.ListBackups(r.Context(), volumeName) + if err != nil { + writeError(w, http.StatusBadGateway, err.Error()) + return + } + writeJSON(w, http.StatusOK, api.BackupListResponse{ + Namespace: namespace, + PVC: pvcName, + Volume: volumeName, + Backups: buildBackupRecords(backups), + }) + case "restic": + jobs, err := s.client.ListBackupJobsForPVC(r.Context(), namespace, pvcName) + if err != nil { + writeError(w, http.StatusBadGateway, err.Error()) + return + } + writeJSON(w, http.StatusOK, api.BackupListResponse{ + Namespace: namespace, + PVC: pvcName, + Volume: volumeName, + Backups: buildResticBackupRecords(jobs), + }) + default: + writeError(w, http.StatusBadRequest, "unsupported backup driver") } - - writeJSON(w, http.StatusOK, api.BackupListResponse{ - Namespace: namespace, - PVC: pvcName, - Volume: volumeName, - Backups: buildBackupRecords(backups), - }) } func (s *Server) handleBackup(w http.ResponseWriter, r *http.Request) { @@ -911,9 +928,46 @@ func (s *Server) enrichPVCInventory(ctx context.Context, entry *api.PVCInventory entry.HealthReason = "stale" } case "restic": - entry.Healthy = false - entry.HealthReason = "inventory_unavailable" - entry.Error = "restic inventory telemetry is not implemented yet" + jobs, err := s.client.ListBackupJobsForPVC(ctx, entry.Namespace, entry.PVC) + if err != nil { + entry.Healthy = false + entry.HealthReason = "lookup_failed" + entry.Error = err.Error() + return + } + entry.BackupCount = len(jobs) + + completed := make([]k8s.BackupJobSummary, 0, len(jobs)) + for _, job := range jobs { + if strings.EqualFold(job.State, "Completed") { + completed = append(completed, job) + } + } + entry.CompletedBackups = len(completed) + if len(completed) == 0 { + entry.Healthy = false + if len(jobs) == 0 { + entry.HealthReason = "missing" + } else { + entry.HealthReason = "no_completed" + } + return + } + + latestTime := backupJobTimestamp(completed[0]) + if latestTime.IsZero() { + entry.Healthy = false + entry.HealthReason = "unknown_timestamp" + return + } + entry.LastBackupAt = latestTime.UTC().Format(time.RFC3339) + entry.LastBackupAgeHours = roundHours(time.Since(latestTime).Hours()) + entry.Healthy = time.Since(latestTime) <= s.cfg.BackupMaxAge + if entry.Healthy { + entry.HealthReason = "fresh" + } else { + entry.HealthReason = "stale" + } default: entry.Healthy = false entry.HealthReason = "unsupported_driver" @@ -939,10 +993,6 @@ func (s *Server) runPolicyCycle(ctx context.Context) { } defer s.endRun() - if s.cfg.BackupDriver != "longhorn" { - return - } - policies := s.activePolicies() if len(policies) == 0 { return @@ -1408,6 +1458,46 @@ func buildBackupRecords(backups []longhorn.Backup) []api.BackupRecord { return records } +func buildResticBackupRecords(jobs []k8s.BackupJobSummary) []api.BackupRecord { + records := make([]api.BackupRecord, 0, len(jobs)) + latestName := "" + for _, job := range jobs { + if strings.EqualFold(job.State, "Completed") { + latestName = job.Name + break + } + } + + for _, job := range jobs { + created := "" + if ts := backupJobTimestamp(job); !ts.IsZero() { + created = ts.UTC().Format(time.RFC3339) + } + url := "" + latest := job.Name == latestName + if latest && strings.EqualFold(job.State, "Completed") { + // The restore API defaults to "latest"; expose one selectable option in the UI. + url = "latest" + } + records = append(records, api.BackupRecord{ + Name: job.Name, + SnapshotName: job.Name, + Created: created, + State: job.State, + URL: url, + Latest: latest, + }) + } + return records +} + +func backupJobTimestamp(job k8s.BackupJobSummary) time.Time { + if !job.CompletionTime.IsZero() { + return job.CompletionTime + } + return job.CreatedAt +} + func latestCompletedBackup(backups []longhorn.Backup) (longhorn.Backup, time.Time, bool) { var selected longhorn.Backup var selectedTime time.Time diff --git a/internal/server/server_test.go b/internal/server/server_test.go index 24dc753..d31bee6 100644 --- a/internal/server/server_test.go +++ b/internal/server/server_test.go @@ -20,6 +20,7 @@ import ( type fakeKubeClient struct { pvcs []k8s.PVCSummary + backupJobs map[string][]k8s.BackupJobSummary targetExists bool secretData map[string][]byte } @@ -40,6 +41,17 @@ func (f *fakeKubeClient) ListBoundPVCs(_ context.Context) ([]k8s.PVCSummary, err return f.pvcs, nil } +func (f *fakeKubeClient) ListBackupJobsForPVC(_ context.Context, namespace, pvc string) ([]k8s.BackupJobSummary, error) { + if f.backupJobs == nil { + return nil, nil + } + key := namespace + "/" + pvc + items := f.backupJobs[key] + out := make([]k8s.BackupJobSummary, len(items)) + copy(out, items) + return out, nil +} + func (f *fakeKubeClient) PersistentVolumeClaimExists(_ context.Context, _, _ string) (bool, error) { return f.targetExists, nil } @@ -262,6 +274,109 @@ func TestBackupCreatesSnapshotBeforeBackup(t *testing.T) { } } +func TestResticInventoryUsesCompletedBackupJobs(t *testing.T) { + completedAt := time.Now().UTC().Add(-2 * time.Hour) + srv := &Server{ + cfg: &config.Config{ + AuthRequired: false, + BackupDriver: "restic", + BackupMaxAge: 24 * time.Hour, + }, + client: &fakeKubeClient{ + pvcs: []k8s.PVCSummary{ + {Namespace: "apps", Name: "data", VolumeName: "pv-apps-data", Phase: "Bound"}, + }, + backupJobs: map[string][]k8s.BackupJobSummary{ + "apps/data": { + { + Name: "soteria-backup-data-20260413-000000", + Namespace: "apps", + PVC: "data", + CreatedAt: completedAt.Add(-2 * time.Minute), + CompletionTime: completedAt, + State: "Completed", + }, + }, + }, + }, + longhorn: &fakeLonghornClient{}, + metrics: newTelemetry(), + } + srv.handler = http.HandlerFunc(srv.route) + + req := httptest.NewRequest(http.MethodGet, "/v1/inventory", nil) + res := httptest.NewRecorder() + srv.Handler().ServeHTTP(res, req) + + if res.Code != http.StatusOK { + t.Fatalf("expected 200, got %d: %s", res.Code, res.Body.String()) + } + + var payload api.InventoryResponse + if err := json.Unmarshal(res.Body.Bytes(), &payload); err != nil { + t.Fatalf("decode inventory: %v", err) + } + if len(payload.Namespaces) != 1 || len(payload.Namespaces[0].PVCs) != 1 { + t.Fatalf("unexpected inventory payload: %#v", payload) + } + entry := payload.Namespaces[0].PVCs[0] + if !entry.Healthy { + t.Fatalf("expected healthy entry, got %#v", entry) + } + if entry.HealthReason != "fresh" { + t.Fatalf("expected fresh health reason, got %#v", entry.HealthReason) + } + if entry.CompletedBackups != 1 || entry.BackupCount != 1 { + t.Fatalf("expected one completed backup, got %#v", entry) + } +} + +func TestResticBackupsEndpointReturnsLatestSelector(t *testing.T) { + completedAt := time.Now().UTC().Add(-30 * time.Minute) + srv := &Server{ + cfg: &config.Config{ + AuthRequired: false, + BackupDriver: "restic", + }, + client: &fakeKubeClient{ + backupJobs: map[string][]k8s.BackupJobSummary{ + "apps/data": { + { + Name: "soteria-backup-data-20260413-010000", + Namespace: "apps", + PVC: "data", + CreatedAt: completedAt.Add(-2 * time.Minute), + CompletionTime: completedAt, + State: "Completed", + }, + }, + }, + }, + longhorn: &fakeLonghornClient{}, + metrics: newTelemetry(), + } + srv.handler = http.HandlerFunc(srv.route) + + req := httptest.NewRequest(http.MethodGet, "/v1/backups?namespace=apps&pvc=data", nil) + res := httptest.NewRecorder() + srv.Handler().ServeHTTP(res, req) + + if res.Code != http.StatusOK { + t.Fatalf("expected 200, got %d: %s", res.Code, res.Body.String()) + } + + var payload api.BackupListResponse + if err := json.Unmarshal(res.Body.Bytes(), &payload); err != nil { + t.Fatalf("decode backups response: %v", err) + } + if len(payload.Backups) != 1 { + t.Fatalf("expected one backup record, got %#v", payload.Backups) + } + if payload.Backups[0].URL != "latest" || !payload.Backups[0].Latest { + t.Fatalf("expected latest restic selector, got %#v", payload.Backups[0]) + } +} + func TestMetricsStayPublic(t *testing.T) { srv := &Server{ cfg: &config.Config{AuthRequired: true, AllowedGroups: []string{"admin"}},