scheduler: prevent policy backup job storms

This commit is contained in:
Brad Stein 2026-04-17 00:35:41 -03:00
parent af6de62ca2
commit 8161a4dae8
2 changed files with 276 additions and 8 deletions

View File

@ -1475,7 +1475,21 @@ func (s *Server) runPolicyCycle(ctx context.Context) {
if !ok { if !ok {
continue continue
} }
if !backupDue(pvc.LastBackupAt, effective.IntervalHours) { // Never enqueue a new policy backup while one is already active for this PVC.
// This prevents runaway job storms when a backup is stuck Pending/Running.
if pvc.ActiveBackups > 0 {
s.metrics.RecordPolicyBackup("in_progress")
continue
}
lastRunRef := strings.TrimSpace(pvc.LastBackupAt)
if lastRunRef == "" {
// If no successful backup exists yet, fall back to the most recent job start
// so failed attempts are still throttled by interval_hours.
lastRunRef = strings.TrimSpace(pvc.LastJobStartedAt)
}
if !backupDue(lastRunRef, effective.IntervalHours) {
s.metrics.RecordPolicyBackup("not_due") s.metrics.RecordPolicyBackup("not_due")
continue continue
} }

View File

@ -23,6 +23,8 @@ type fakeKubeClient struct {
pvcs []k8s.PVCSummary pvcs []k8s.PVCSummary
backupJobs map[string][]k8s.BackupJobSummary backupJobs map[string][]k8s.BackupJobSummary
jobLogs map[string]string jobLogs map[string]string
backupJobCalls int
simulateJobListMutation bool
lastBackupReq api.BackupRequest lastBackupReq api.BackupRequest
lastRestoreReq api.RestoreTestRequest lastRestoreReq api.RestoreTestRequest
targetExists bool targetExists bool
@ -34,7 +36,23 @@ func (f *fakeKubeClient) ResolvePVCVolume(_ context.Context, namespace, pvcName
} }
func (f *fakeKubeClient) CreateBackupJob(_ context.Context, _ *config.Config, req api.BackupRequest) (string, string, error) { func (f *fakeKubeClient) CreateBackupJob(_ context.Context, _ *config.Config, req api.BackupRequest) (string, string, error) {
f.backupJobCalls++
f.lastBackupReq = req f.lastBackupReq = req
if f.simulateJobListMutation {
if f.backupJobs == nil {
f.backupJobs = map[string][]k8s.BackupJobSummary{}
}
key := req.Namespace + "/" + req.PVC
f.backupJobs[key] = append([]k8s.BackupJobSummary{
{
Name: fmt.Sprintf("soteria-backup-%s-%d", req.PVC, f.backupJobCalls),
Namespace: req.Namespace,
PVC: req.PVC,
CreatedAt: time.Now().UTC(),
State: "Pending",
},
}, f.backupJobs[key]...)
}
return "backup-job", "backup-secret", nil return "backup-job", "backup-secret", nil
} }
@ -880,6 +898,242 @@ func TestLoadPoliciesDefaultsDedupeEnabledWhenMissing(t *testing.T) {
} }
} }
func TestRunPolicyCycleSkipsWhenBackupAlreadyInProgress(t *testing.T) {
startedAt := time.Now().UTC().Add(-2 * time.Hour)
kube := &fakeKubeClient{
pvcs: []k8s.PVCSummary{
{Namespace: "apps", Name: "data", VolumeName: "pv-apps-data", Phase: "Bound"},
},
backupJobs: map[string][]k8s.BackupJobSummary{
"apps/data": {
{
Name: "soteria-backup-data-20260413-010000",
Namespace: "apps",
PVC: "data",
CreatedAt: startedAt,
State: "Running",
},
},
},
}
srv := &Server{
cfg: &config.Config{
AuthRequired: false,
BackupDriver: "restic",
BackupMaxAge: 24 * time.Hour,
},
client: kube,
longhorn: &fakeLonghornClient{},
metrics: newTelemetry(),
policies: map[string]api.BackupPolicy{
"apps__data": {
ID: "apps__data",
Namespace: "apps",
PVC: "data",
IntervalHours: 1,
Enabled: true,
},
},
}
srv.runPolicyCycle(context.Background())
if kube.backupJobCalls != 0 {
t.Fatalf("expected policy scheduler to skip while backup is in progress, got %d backup request(s)", kube.backupJobCalls)
}
}
func TestRunPolicyCycleUsesLastAttemptTimestampWhenNoCompletedBackup(t *testing.T) {
startedAt := time.Now().UTC().Add(-30 * time.Minute)
kube := &fakeKubeClient{
pvcs: []k8s.PVCSummary{
{Namespace: "apps", Name: "data", VolumeName: "pv-apps-data", Phase: "Bound"},
},
backupJobs: map[string][]k8s.BackupJobSummary{
"apps/data": {
{
Name: "soteria-backup-data-20260413-010000",
Namespace: "apps",
PVC: "data",
CreatedAt: startedAt,
CompletionTime: startedAt.Add(5 * time.Minute),
State: "Failed",
},
},
},
}
srv := &Server{
cfg: &config.Config{
AuthRequired: false,
BackupDriver: "restic",
BackupMaxAge: 24 * time.Hour,
},
client: kube,
longhorn: &fakeLonghornClient{},
metrics: newTelemetry(),
policies: map[string]api.BackupPolicy{
"apps__data": {
ID: "apps__data",
Namespace: "apps",
PVC: "data",
IntervalHours: 1,
Enabled: true,
},
},
}
srv.runPolicyCycle(context.Background())
if kube.backupJobCalls != 0 {
t.Fatalf("expected recent failed attempt to defer policy backup, got %d backup request(s)", kube.backupJobCalls)
}
}
func TestRunPolicyCycleRetriesAfterIntervalFromLastAttempt(t *testing.T) {
startedAt := time.Now().UTC().Add(-2 * time.Hour)
kube := &fakeKubeClient{
pvcs: []k8s.PVCSummary{
{Namespace: "apps", Name: "data", VolumeName: "pv-apps-data", Phase: "Bound"},
},
backupJobs: map[string][]k8s.BackupJobSummary{
"apps/data": {
{
Name: "soteria-backup-data-20260413-010000",
Namespace: "apps",
PVC: "data",
CreatedAt: startedAt,
CompletionTime: startedAt.Add(5 * time.Minute),
State: "Failed",
},
},
},
}
srv := &Server{
cfg: &config.Config{
AuthRequired: false,
BackupDriver: "restic",
BackupMaxAge: 24 * time.Hour,
},
client: kube,
longhorn: &fakeLonghornClient{},
metrics: newTelemetry(),
policies: map[string]api.BackupPolicy{
"apps__data": {
ID: "apps__data",
Namespace: "apps",
PVC: "data",
IntervalHours: 1,
Enabled: true,
},
},
}
srv.runPolicyCycle(context.Background())
if kube.backupJobCalls != 1 {
t.Fatalf("expected one policy backup after interval elapsed, got %d", kube.backupJobCalls)
}
if kube.lastBackupReq.Namespace != "apps" || kube.lastBackupReq.PVC != "data" {
t.Fatalf("unexpected policy backup target: %#v", kube.lastBackupReq)
}
}
func TestRunPolicyCycleDoesNotStormAcrossRepeatedTicksWhenBackupInProgress(t *testing.T) {
startedAt := time.Now().UTC().Add(-5 * time.Minute)
kube := &fakeKubeClient{
pvcs: []k8s.PVCSummary{
{Namespace: "apps", Name: "data", VolumeName: "pv-apps-data", Phase: "Bound"},
},
backupJobs: map[string][]k8s.BackupJobSummary{
"apps/data": {
{
Name: "soteria-backup-data-active",
Namespace: "apps",
PVC: "data",
CreatedAt: startedAt,
State: "Running",
},
},
},
}
srv := &Server{
cfg: &config.Config{
AuthRequired: false,
BackupDriver: "restic",
BackupMaxAge: 24 * time.Hour,
},
client: kube,
longhorn: &fakeLonghornClient{},
metrics: newTelemetry(),
policies: map[string]api.BackupPolicy{
"apps__data": {
ID: "apps__data",
Namespace: "apps",
PVC: "data",
IntervalHours: 1,
Enabled: true,
},
},
}
for range 30 {
srv.runPolicyCycle(context.Background())
}
if kube.backupJobCalls != 0 {
t.Fatalf("expected no policy backups while active job exists across repeated ticks, got %d", kube.backupJobCalls)
}
}
func TestRunPolicyCycleCreatesAtMostOneJobAcrossRepeatedTicksAfterDueFailure(t *testing.T) {
startedAt := time.Now().UTC().Add(-2 * time.Hour)
kube := &fakeKubeClient{
pvcs: []k8s.PVCSummary{
{Namespace: "apps", Name: "data", VolumeName: "pv-apps-data", Phase: "Bound"},
},
backupJobs: map[string][]k8s.BackupJobSummary{
"apps/data": {
{
Name: "soteria-backup-data-old-failed",
Namespace: "apps",
PVC: "data",
CreatedAt: startedAt,
CompletionTime: startedAt.Add(5 * time.Minute),
State: "Failed",
},
},
},
simulateJobListMutation: true,
}
srv := &Server{
cfg: &config.Config{
AuthRequired: false,
BackupDriver: "restic",
BackupMaxAge: 24 * time.Hour,
},
client: kube,
longhorn: &fakeLonghornClient{},
metrics: newTelemetry(),
policies: map[string]api.BackupPolicy{
"apps__data": {
ID: "apps__data",
Namespace: "apps",
PVC: "data",
IntervalHours: 1,
Enabled: true,
},
},
}
for range 20 {
srv.runPolicyCycle(context.Background())
}
if kube.backupJobCalls != 1 {
t.Fatalf("expected exactly one policy backup after due failure and then suppression by in-progress job, got %d", kube.backupJobCalls)
}
}
func TestNamespaceBackupDryRun(t *testing.T) { func TestNamespaceBackupDryRun(t *testing.T) {
srv := &Server{ srv := &Server{
cfg: &config.Config{ cfg: &config.Config{