diff --git a/internal/server/server.go b/internal/server/server.go index 6c1f443..c50f8e2 100644 --- a/internal/server/server.go +++ b/internal/server/server.go @@ -1475,7 +1475,21 @@ func (s *Server) runPolicyCycle(ctx context.Context) { if !ok { continue } - if !backupDue(pvc.LastBackupAt, effective.IntervalHours) { + // Never enqueue a new policy backup while one is already active for this PVC. + // This prevents runaway job storms when a backup is stuck Pending/Running. + if pvc.ActiveBackups > 0 { + s.metrics.RecordPolicyBackup("in_progress") + continue + } + + lastRunRef := strings.TrimSpace(pvc.LastBackupAt) + if lastRunRef == "" { + // If no successful backup exists yet, fall back to the most recent job start + // so failed attempts are still throttled by interval_hours. + lastRunRef = strings.TrimSpace(pvc.LastJobStartedAt) + } + + if !backupDue(lastRunRef, effective.IntervalHours) { s.metrics.RecordPolicyBackup("not_due") continue } diff --git a/internal/server/server_test.go b/internal/server/server_test.go index d2c6d5d..f59148b 100644 --- a/internal/server/server_test.go +++ b/internal/server/server_test.go @@ -20,13 +20,15 @@ import ( ) type fakeKubeClient struct { - pvcs []k8s.PVCSummary - backupJobs map[string][]k8s.BackupJobSummary - jobLogs map[string]string - lastBackupReq api.BackupRequest - lastRestoreReq api.RestoreTestRequest - targetExists bool - secretData map[string][]byte + pvcs []k8s.PVCSummary + backupJobs map[string][]k8s.BackupJobSummary + jobLogs map[string]string + backupJobCalls int + simulateJobListMutation bool + lastBackupReq api.BackupRequest + lastRestoreReq api.RestoreTestRequest + targetExists bool + secretData map[string][]byte } func (f *fakeKubeClient) ResolvePVCVolume(_ context.Context, namespace, pvcName string) (string, *corev1.PersistentVolumeClaim, *corev1.PersistentVolume, error) { @@ -34,7 +36,23 @@ func (f *fakeKubeClient) ResolvePVCVolume(_ context.Context, namespace, pvcName } func (f *fakeKubeClient) CreateBackupJob(_ context.Context, _ *config.Config, req api.BackupRequest) (string, string, error) { + f.backupJobCalls++ f.lastBackupReq = req + if f.simulateJobListMutation { + if f.backupJobs == nil { + f.backupJobs = map[string][]k8s.BackupJobSummary{} + } + key := req.Namespace + "/" + req.PVC + f.backupJobs[key] = append([]k8s.BackupJobSummary{ + { + Name: fmt.Sprintf("soteria-backup-%s-%d", req.PVC, f.backupJobCalls), + Namespace: req.Namespace, + PVC: req.PVC, + CreatedAt: time.Now().UTC(), + State: "Pending", + }, + }, f.backupJobs[key]...) + } return "backup-job", "backup-secret", nil } @@ -880,6 +898,242 @@ func TestLoadPoliciesDefaultsDedupeEnabledWhenMissing(t *testing.T) { } } +func TestRunPolicyCycleSkipsWhenBackupAlreadyInProgress(t *testing.T) { + startedAt := time.Now().UTC().Add(-2 * time.Hour) + kube := &fakeKubeClient{ + pvcs: []k8s.PVCSummary{ + {Namespace: "apps", Name: "data", VolumeName: "pv-apps-data", Phase: "Bound"}, + }, + backupJobs: map[string][]k8s.BackupJobSummary{ + "apps/data": { + { + Name: "soteria-backup-data-20260413-010000", + Namespace: "apps", + PVC: "data", + CreatedAt: startedAt, + State: "Running", + }, + }, + }, + } + srv := &Server{ + cfg: &config.Config{ + AuthRequired: false, + BackupDriver: "restic", + BackupMaxAge: 24 * time.Hour, + }, + client: kube, + longhorn: &fakeLonghornClient{}, + metrics: newTelemetry(), + policies: map[string]api.BackupPolicy{ + "apps__data": { + ID: "apps__data", + Namespace: "apps", + PVC: "data", + IntervalHours: 1, + Enabled: true, + }, + }, + } + + srv.runPolicyCycle(context.Background()) + + if kube.backupJobCalls != 0 { + t.Fatalf("expected policy scheduler to skip while backup is in progress, got %d backup request(s)", kube.backupJobCalls) + } +} + +func TestRunPolicyCycleUsesLastAttemptTimestampWhenNoCompletedBackup(t *testing.T) { + startedAt := time.Now().UTC().Add(-30 * time.Minute) + kube := &fakeKubeClient{ + pvcs: []k8s.PVCSummary{ + {Namespace: "apps", Name: "data", VolumeName: "pv-apps-data", Phase: "Bound"}, + }, + backupJobs: map[string][]k8s.BackupJobSummary{ + "apps/data": { + { + Name: "soteria-backup-data-20260413-010000", + Namespace: "apps", + PVC: "data", + CreatedAt: startedAt, + CompletionTime: startedAt.Add(5 * time.Minute), + State: "Failed", + }, + }, + }, + } + srv := &Server{ + cfg: &config.Config{ + AuthRequired: false, + BackupDriver: "restic", + BackupMaxAge: 24 * time.Hour, + }, + client: kube, + longhorn: &fakeLonghornClient{}, + metrics: newTelemetry(), + policies: map[string]api.BackupPolicy{ + "apps__data": { + ID: "apps__data", + Namespace: "apps", + PVC: "data", + IntervalHours: 1, + Enabled: true, + }, + }, + } + + srv.runPolicyCycle(context.Background()) + + if kube.backupJobCalls != 0 { + t.Fatalf("expected recent failed attempt to defer policy backup, got %d backup request(s)", kube.backupJobCalls) + } +} + +func TestRunPolicyCycleRetriesAfterIntervalFromLastAttempt(t *testing.T) { + startedAt := time.Now().UTC().Add(-2 * time.Hour) + kube := &fakeKubeClient{ + pvcs: []k8s.PVCSummary{ + {Namespace: "apps", Name: "data", VolumeName: "pv-apps-data", Phase: "Bound"}, + }, + backupJobs: map[string][]k8s.BackupJobSummary{ + "apps/data": { + { + Name: "soteria-backup-data-20260413-010000", + Namespace: "apps", + PVC: "data", + CreatedAt: startedAt, + CompletionTime: startedAt.Add(5 * time.Minute), + State: "Failed", + }, + }, + }, + } + srv := &Server{ + cfg: &config.Config{ + AuthRequired: false, + BackupDriver: "restic", + BackupMaxAge: 24 * time.Hour, + }, + client: kube, + longhorn: &fakeLonghornClient{}, + metrics: newTelemetry(), + policies: map[string]api.BackupPolicy{ + "apps__data": { + ID: "apps__data", + Namespace: "apps", + PVC: "data", + IntervalHours: 1, + Enabled: true, + }, + }, + } + + srv.runPolicyCycle(context.Background()) + + if kube.backupJobCalls != 1 { + t.Fatalf("expected one policy backup after interval elapsed, got %d", kube.backupJobCalls) + } + if kube.lastBackupReq.Namespace != "apps" || kube.lastBackupReq.PVC != "data" { + t.Fatalf("unexpected policy backup target: %#v", kube.lastBackupReq) + } +} + +func TestRunPolicyCycleDoesNotStormAcrossRepeatedTicksWhenBackupInProgress(t *testing.T) { + startedAt := time.Now().UTC().Add(-5 * time.Minute) + kube := &fakeKubeClient{ + pvcs: []k8s.PVCSummary{ + {Namespace: "apps", Name: "data", VolumeName: "pv-apps-data", Phase: "Bound"}, + }, + backupJobs: map[string][]k8s.BackupJobSummary{ + "apps/data": { + { + Name: "soteria-backup-data-active", + Namespace: "apps", + PVC: "data", + CreatedAt: startedAt, + State: "Running", + }, + }, + }, + } + srv := &Server{ + cfg: &config.Config{ + AuthRequired: false, + BackupDriver: "restic", + BackupMaxAge: 24 * time.Hour, + }, + client: kube, + longhorn: &fakeLonghornClient{}, + metrics: newTelemetry(), + policies: map[string]api.BackupPolicy{ + "apps__data": { + ID: "apps__data", + Namespace: "apps", + PVC: "data", + IntervalHours: 1, + Enabled: true, + }, + }, + } + + for range 30 { + srv.runPolicyCycle(context.Background()) + } + + if kube.backupJobCalls != 0 { + t.Fatalf("expected no policy backups while active job exists across repeated ticks, got %d", kube.backupJobCalls) + } +} + +func TestRunPolicyCycleCreatesAtMostOneJobAcrossRepeatedTicksAfterDueFailure(t *testing.T) { + startedAt := time.Now().UTC().Add(-2 * time.Hour) + kube := &fakeKubeClient{ + pvcs: []k8s.PVCSummary{ + {Namespace: "apps", Name: "data", VolumeName: "pv-apps-data", Phase: "Bound"}, + }, + backupJobs: map[string][]k8s.BackupJobSummary{ + "apps/data": { + { + Name: "soteria-backup-data-old-failed", + Namespace: "apps", + PVC: "data", + CreatedAt: startedAt, + CompletionTime: startedAt.Add(5 * time.Minute), + State: "Failed", + }, + }, + }, + simulateJobListMutation: true, + } + srv := &Server{ + cfg: &config.Config{ + AuthRequired: false, + BackupDriver: "restic", + BackupMaxAge: 24 * time.Hour, + }, + client: kube, + longhorn: &fakeLonghornClient{}, + metrics: newTelemetry(), + policies: map[string]api.BackupPolicy{ + "apps__data": { + ID: "apps__data", + Namespace: "apps", + PVC: "data", + IntervalHours: 1, + Enabled: true, + }, + }, + } + + for range 20 { + srv.runPolicyCycle(context.Background()) + } + + if kube.backupJobCalls != 1 { + t.Fatalf("expected exactly one policy backup after due failure and then suppression by in-progress job, got %d", kube.backupJobCalls) + } +} + func TestNamespaceBackupDryRun(t *testing.T) { srv := &Server{ cfg: &config.Config{