package server import ( "context" "encoding/json" "fmt" "log" "sort" "strings" "time" "scm.bstein.dev/bstein/soteria/internal/api" ) func (s *Server) runPolicyCycle(ctx context.Context) { if !s.beginRun() { return } defer s.endRun() policies := s.activePolicies() if len(policies) == 0 { return } runCtx, cancel := context.WithTimeout(ctx, 3*time.Minute) defer cancel() inventory, err := s.buildInventory(runCtx) if err != nil { log.Printf("policy cycle inventory failed: %v", err) s.metrics.RecordPolicyBackup("inventory_error") return } pvcMap := make(map[string]api.PVCInventory) namespaceMap := make(map[string][]api.PVCInventory) for _, group := range inventory.Namespaces { for _, pvc := range group.PVCs { key := pvc.Namespace + "/" + pvc.PVC pvcMap[key] = pvc namespaceMap[pvc.Namespace] = append(namespaceMap[pvc.Namespace], pvc) } } type effectivePolicy struct { IntervalHours float64 Dedupe bool KeepLast int } effectivePolicies := map[string]effectivePolicy{} for _, policy := range policies { matches := []api.PVCInventory{} if policy.PVC != "" { if pvc, ok := pvcMap[policy.Namespace+"/"+policy.PVC]; ok { matches = append(matches, pvc) } } else { matches = append(matches, namespaceMap[policy.Namespace]...) } for _, pvc := range matches { key := pvc.Namespace + "/" + pvc.PVC current, exists := effectivePolicies[key] if !exists || policy.IntervalHours < current.IntervalHours || (policy.IntervalHours == current.IntervalHours && keepLastStricter(policy.KeepLast, current.KeepLast)) { effectivePolicies[key] = effectivePolicy{ IntervalHours: policy.IntervalHours, Dedupe: policy.Dedupe, KeepLast: policy.KeepLast, } } } } for key, effective := range effectivePolicies { pvc, ok := pvcMap[key] if !ok { continue } // Never enqueue a new policy backup while one is already active for this PVC. // This prevents runaway job storms when a backup is stuck Pending/Running. if pvc.ActiveBackups > 0 { s.metrics.RecordPolicyBackup("in_progress") continue } lastRunRef := strings.TrimSpace(pvc.LastBackupAt) if lastRunRef == "" { // If no successful backup exists yet, fall back to the most recent job start // so failed attempts are still throttled by interval_hours. lastRunRef = strings.TrimSpace(pvc.LastJobStartedAt) } if !backupDue(lastRunRef, effective.IntervalHours) { s.metrics.RecordPolicyBackup("not_due") continue } _, result, err := s.executeBackup(runCtx, api.BackupRequest{ Namespace: pvc.Namespace, PVC: pvc.PVC, DryRun: false, Dedupe: boolPtr(effective.Dedupe), KeepLast: intPtr(effective.KeepLast), }, "policy-scheduler") s.metrics.RecordBackupRequest(s.cfg.BackupDriver, result) if err != nil { s.metrics.RecordPolicyBackup(result) log.Printf("policy backup failed for %s/%s: %v", pvc.Namespace, pvc.PVC, err) continue } s.metrics.RecordPolicyBackup("success") } } func (s *Server) beginRun() bool { s.runMu.Lock() defer s.runMu.Unlock() if s.running { return false } s.running = true return true } func (s *Server) endRun() { s.runMu.Lock() defer s.runMu.Unlock() s.running = false } func (s *Server) loadPolicies(ctx context.Context) error { raw, err := s.client.LoadSecretData(ctx, s.cfg.Namespace, s.cfg.PolicySecretName, policySecretKey) if err != nil { return err } if len(raw) == 0 { return nil } var doc struct { Policies []struct { ID string `json:"id"` Namespace string `json:"namespace"` PVC string `json:"pvc,omitempty"` IntervalHours float64 `json:"interval_hours"` Enabled bool `json:"enabled"` Dedupe *bool `json:"dedupe,omitempty"` KeepLast *int `json:"keep_last,omitempty"` CreatedAt string `json:"created_at,omitempty"` UpdatedAt string `json:"updated_at,omitempty"` } `json:"policies"` } if err := json.Unmarshal(raw, &doc); err != nil { return fmt.Errorf("decode policy document: %w", err) } next := map[string]api.BackupPolicy{} now := time.Now().UTC().Format(time.RFC3339) for _, policy := range doc.Policies { namespace := strings.TrimSpace(policy.Namespace) pvc := strings.TrimSpace(policy.PVC) if namespace == "" { continue } interval := policy.IntervalHours if interval <= 0 { interval = defaultPolicyHours } dedupe := true if policy.Dedupe != nil { dedupe = *policy.Dedupe } keepLast := keepLastDefault(policy.KeepLast) id := policyKey(namespace, pvc) createdAt := policy.CreatedAt if createdAt == "" { createdAt = now } updatedAt := policy.UpdatedAt if updatedAt == "" { updatedAt = createdAt } next[id] = api.BackupPolicy{ ID: id, Namespace: namespace, PVC: pvc, IntervalHours: interval, Enabled: policy.Enabled, Dedupe: dedupe, KeepLast: keepLast, CreatedAt: createdAt, UpdatedAt: updatedAt, } } s.policyMu.Lock() s.policies = next s.policyMu.Unlock() return nil } func (s *Server) persistPolicies(ctx context.Context, policies []api.BackupPolicy) error { doc := struct { Policies []api.BackupPolicy `json:"policies"` }{ Policies: policies, } payload, err := json.Marshal(doc) if err != nil { return fmt.Errorf("encode policy document: %w", err) } return s.client.SaveSecretData(ctx, s.cfg.Namespace, s.cfg.PolicySecretName, policySecretKey, payload, map[string]string{ "app.kubernetes.io/name": "soteria", "app.kubernetes.io/component": "policy-store", }) } func (s *Server) listPolicies() []api.BackupPolicy { s.policyMu.RLock() defer s.policyMu.RUnlock() policies := make([]api.BackupPolicy, 0, len(s.policies)) for _, policy := range s.policies { policies = append(policies, policy) } sort.Slice(policies, func(i, j int) bool { if policies[i].Namespace != policies[j].Namespace { return policies[i].Namespace < policies[j].Namespace } if policies[i].PVC != policies[j].PVC { return policies[i].PVC < policies[j].PVC } return policies[i].ID < policies[j].ID }) return policies } func (s *Server) activePolicies() []api.BackupPolicy { policies := s.listPolicies() filtered := make([]api.BackupPolicy, 0, len(policies)) for _, policy := range policies { if policy.Enabled { filtered = append(filtered, policy) } } return filtered } func (s *Server) upsertPolicy(ctx context.Context, req api.BackupPolicyUpsertRequest) (api.BackupPolicy, error) { namespace := strings.TrimSpace(req.Namespace) pvc := strings.TrimSpace(req.PVC) if namespace == "" { return api.BackupPolicy{}, fmt.Errorf("namespace is required") } if err := validateKubernetesName("namespace", namespace); err != nil { return api.BackupPolicy{}, err } if pvc != "" { if err := validateKubernetesName("pvc", pvc); err != nil { return api.BackupPolicy{}, err } } interval := req.IntervalHours if interval <= 0 { interval = defaultPolicyHours } if interval > maxPolicyIntervalHrs { return api.BackupPolicy{}, fmt.Errorf("interval_hours must be <= %d", maxPolicyIntervalHrs) } enabled := true if req.Enabled != nil { enabled = *req.Enabled } dedupe := dedupeDefault(req.Dedupe) if err := validateKeepLast(req.KeepLast); err != nil { return api.BackupPolicy{}, err } keepLast := keepLastDefault(req.KeepLast) id := policyKey(namespace, pvc) now := time.Now().UTC().Format(time.RFC3339) s.policyMu.Lock() before := clonePolicyMap(s.policies) createdAt := now if existing, ok := s.policies[id]; ok && existing.CreatedAt != "" { createdAt = existing.CreatedAt } policy := api.BackupPolicy{ ID: id, Namespace: namespace, PVC: pvc, IntervalHours: interval, Enabled: enabled, Dedupe: dedupe, KeepLast: keepLast, CreatedAt: createdAt, UpdatedAt: now, } s.policies[id] = policy snapshot := policySliceFromMap(s.policies) s.policyMu.Unlock() if err := s.persistPolicies(ctx, snapshot); err != nil { s.policyMu.Lock() s.policies = before s.policyMu.Unlock() return api.BackupPolicy{}, err } return policy, nil } func (s *Server) deletePolicy(ctx context.Context, id string) (bool, error) { id = strings.TrimSpace(id) if id == "" { return false, nil } s.policyMu.Lock() if _, ok := s.policies[id]; !ok { s.policyMu.Unlock() return false, nil } before := clonePolicyMap(s.policies) delete(s.policies, id) snapshot := policySliceFromMap(s.policies) s.policyMu.Unlock() if err := s.persistPolicies(ctx, snapshot); err != nil { s.policyMu.Lock() s.policies = before s.policyMu.Unlock() return false, err } return true, nil } func policyKey(namespace, pvc string) string { scope := strings.TrimSpace(pvc) if scope == "" { scope = "_all" } return strings.TrimSpace(namespace) + "__" + scope } func policySliceFromMap(source map[string]api.BackupPolicy) []api.BackupPolicy { out := make([]api.BackupPolicy, 0, len(source)) for _, policy := range source { out = append(out, policy) } sort.Slice(out, func(i, j int) bool { return out[i].ID < out[j].ID }) return out } func clonePolicyMap(source map[string]api.BackupPolicy) map[string]api.BackupPolicy { cloned := make(map[string]api.BackupPolicy, len(source)) for key, value := range source { cloned[key] = value } return cloned } func backupDue(lastBackupAt string, intervalHours float64) bool { if intervalHours <= 0 { intervalHours = defaultPolicyHours } if strings.TrimSpace(lastBackupAt) == "" { return true } timestamp, ok := parseBackupTime(lastBackupAt) if !ok { return true } interval := time.Duration(intervalHours * float64(time.Hour)) return time.Since(timestamp) >= interval }