380 lines
9.6 KiB
Go
380 lines
9.6 KiB
Go
|
|
package server
|
||
|
|
|
||
|
|
import (
|
||
|
|
"context"
|
||
|
|
"encoding/json"
|
||
|
|
"fmt"
|
||
|
|
"log"
|
||
|
|
"sort"
|
||
|
|
"strings"
|
||
|
|
"time"
|
||
|
|
|
||
|
|
"scm.bstein.dev/bstein/soteria/internal/api"
|
||
|
|
)
|
||
|
|
|
||
|
|
func (s *Server) runPolicyCycle(ctx context.Context) {
|
||
|
|
if !s.beginRun() {
|
||
|
|
return
|
||
|
|
}
|
||
|
|
defer s.endRun()
|
||
|
|
|
||
|
|
policies := s.activePolicies()
|
||
|
|
if len(policies) == 0 {
|
||
|
|
return
|
||
|
|
}
|
||
|
|
|
||
|
|
runCtx, cancel := context.WithTimeout(ctx, 3*time.Minute)
|
||
|
|
defer cancel()
|
||
|
|
|
||
|
|
inventory, err := s.buildInventory(runCtx)
|
||
|
|
if err != nil {
|
||
|
|
log.Printf("policy cycle inventory failed: %v", err)
|
||
|
|
s.metrics.RecordPolicyBackup("inventory_error")
|
||
|
|
return
|
||
|
|
}
|
||
|
|
|
||
|
|
pvcMap := make(map[string]api.PVCInventory)
|
||
|
|
namespaceMap := make(map[string][]api.PVCInventory)
|
||
|
|
for _, group := range inventory.Namespaces {
|
||
|
|
for _, pvc := range group.PVCs {
|
||
|
|
key := pvc.Namespace + "/" + pvc.PVC
|
||
|
|
pvcMap[key] = pvc
|
||
|
|
namespaceMap[pvc.Namespace] = append(namespaceMap[pvc.Namespace], pvc)
|
||
|
|
}
|
||
|
|
}
|
||
|
|
|
||
|
|
type effectivePolicy struct {
|
||
|
|
IntervalHours float64
|
||
|
|
Dedupe bool
|
||
|
|
KeepLast int
|
||
|
|
}
|
||
|
|
effectivePolicies := map[string]effectivePolicy{}
|
||
|
|
for _, policy := range policies {
|
||
|
|
matches := []api.PVCInventory{}
|
||
|
|
if policy.PVC != "" {
|
||
|
|
if pvc, ok := pvcMap[policy.Namespace+"/"+policy.PVC]; ok {
|
||
|
|
matches = append(matches, pvc)
|
||
|
|
}
|
||
|
|
} else {
|
||
|
|
matches = append(matches, namespaceMap[policy.Namespace]...)
|
||
|
|
}
|
||
|
|
|
||
|
|
for _, pvc := range matches {
|
||
|
|
key := pvc.Namespace + "/" + pvc.PVC
|
||
|
|
current, exists := effectivePolicies[key]
|
||
|
|
if !exists || policy.IntervalHours < current.IntervalHours || (policy.IntervalHours == current.IntervalHours && keepLastStricter(policy.KeepLast, current.KeepLast)) {
|
||
|
|
effectivePolicies[key] = effectivePolicy{
|
||
|
|
IntervalHours: policy.IntervalHours,
|
||
|
|
Dedupe: policy.Dedupe,
|
||
|
|
KeepLast: policy.KeepLast,
|
||
|
|
}
|
||
|
|
}
|
||
|
|
}
|
||
|
|
}
|
||
|
|
|
||
|
|
for key, effective := range effectivePolicies {
|
||
|
|
pvc, ok := pvcMap[key]
|
||
|
|
if !ok {
|
||
|
|
continue
|
||
|
|
}
|
||
|
|
// Never enqueue a new policy backup while one is already active for this PVC.
|
||
|
|
// This prevents runaway job storms when a backup is stuck Pending/Running.
|
||
|
|
if pvc.ActiveBackups > 0 {
|
||
|
|
s.metrics.RecordPolicyBackup("in_progress")
|
||
|
|
continue
|
||
|
|
}
|
||
|
|
|
||
|
|
lastRunRef := strings.TrimSpace(pvc.LastBackupAt)
|
||
|
|
if lastRunRef == "" {
|
||
|
|
// If no successful backup exists yet, fall back to the most recent job start
|
||
|
|
// so failed attempts are still throttled by interval_hours.
|
||
|
|
lastRunRef = strings.TrimSpace(pvc.LastJobStartedAt)
|
||
|
|
}
|
||
|
|
|
||
|
|
if !backupDue(lastRunRef, effective.IntervalHours) {
|
||
|
|
s.metrics.RecordPolicyBackup("not_due")
|
||
|
|
continue
|
||
|
|
}
|
||
|
|
|
||
|
|
_, result, err := s.executeBackup(runCtx, api.BackupRequest{
|
||
|
|
Namespace: pvc.Namespace,
|
||
|
|
PVC: pvc.PVC,
|
||
|
|
DryRun: false,
|
||
|
|
Dedupe: boolPtr(effective.Dedupe),
|
||
|
|
KeepLast: intPtr(effective.KeepLast),
|
||
|
|
}, "policy-scheduler")
|
||
|
|
s.metrics.RecordBackupRequest(s.cfg.BackupDriver, result)
|
||
|
|
if err != nil {
|
||
|
|
s.metrics.RecordPolicyBackup(result)
|
||
|
|
log.Printf("policy backup failed for %s/%s: %v", pvc.Namespace, pvc.PVC, err)
|
||
|
|
continue
|
||
|
|
}
|
||
|
|
s.metrics.RecordPolicyBackup("success")
|
||
|
|
}
|
||
|
|
}
|
||
|
|
|
||
|
|
func (s *Server) beginRun() bool {
|
||
|
|
s.runMu.Lock()
|
||
|
|
defer s.runMu.Unlock()
|
||
|
|
if s.running {
|
||
|
|
return false
|
||
|
|
}
|
||
|
|
s.running = true
|
||
|
|
return true
|
||
|
|
}
|
||
|
|
|
||
|
|
func (s *Server) endRun() {
|
||
|
|
s.runMu.Lock()
|
||
|
|
defer s.runMu.Unlock()
|
||
|
|
s.running = false
|
||
|
|
}
|
||
|
|
|
||
|
|
func (s *Server) loadPolicies(ctx context.Context) error {
|
||
|
|
raw, err := s.client.LoadSecretData(ctx, s.cfg.Namespace, s.cfg.PolicySecretName, policySecretKey)
|
||
|
|
if err != nil {
|
||
|
|
return err
|
||
|
|
}
|
||
|
|
if len(raw) == 0 {
|
||
|
|
return nil
|
||
|
|
}
|
||
|
|
|
||
|
|
var doc struct {
|
||
|
|
Policies []struct {
|
||
|
|
ID string `json:"id"`
|
||
|
|
Namespace string `json:"namespace"`
|
||
|
|
PVC string `json:"pvc,omitempty"`
|
||
|
|
IntervalHours float64 `json:"interval_hours"`
|
||
|
|
Enabled bool `json:"enabled"`
|
||
|
|
Dedupe *bool `json:"dedupe,omitempty"`
|
||
|
|
KeepLast *int `json:"keep_last,omitempty"`
|
||
|
|
CreatedAt string `json:"created_at,omitempty"`
|
||
|
|
UpdatedAt string `json:"updated_at,omitempty"`
|
||
|
|
} `json:"policies"`
|
||
|
|
}
|
||
|
|
if err := json.Unmarshal(raw, &doc); err != nil {
|
||
|
|
return fmt.Errorf("decode policy document: %w", err)
|
||
|
|
}
|
||
|
|
|
||
|
|
next := map[string]api.BackupPolicy{}
|
||
|
|
now := time.Now().UTC().Format(time.RFC3339)
|
||
|
|
for _, policy := range doc.Policies {
|
||
|
|
namespace := strings.TrimSpace(policy.Namespace)
|
||
|
|
pvc := strings.TrimSpace(policy.PVC)
|
||
|
|
if namespace == "" {
|
||
|
|
continue
|
||
|
|
}
|
||
|
|
interval := policy.IntervalHours
|
||
|
|
if interval <= 0 {
|
||
|
|
interval = defaultPolicyHours
|
||
|
|
}
|
||
|
|
dedupe := true
|
||
|
|
if policy.Dedupe != nil {
|
||
|
|
dedupe = *policy.Dedupe
|
||
|
|
}
|
||
|
|
keepLast := keepLastDefault(policy.KeepLast)
|
||
|
|
id := policyKey(namespace, pvc)
|
||
|
|
createdAt := policy.CreatedAt
|
||
|
|
if createdAt == "" {
|
||
|
|
createdAt = now
|
||
|
|
}
|
||
|
|
updatedAt := policy.UpdatedAt
|
||
|
|
if updatedAt == "" {
|
||
|
|
updatedAt = createdAt
|
||
|
|
}
|
||
|
|
next[id] = api.BackupPolicy{
|
||
|
|
ID: id,
|
||
|
|
Namespace: namespace,
|
||
|
|
PVC: pvc,
|
||
|
|
IntervalHours: interval,
|
||
|
|
Enabled: policy.Enabled,
|
||
|
|
Dedupe: dedupe,
|
||
|
|
KeepLast: keepLast,
|
||
|
|
CreatedAt: createdAt,
|
||
|
|
UpdatedAt: updatedAt,
|
||
|
|
}
|
||
|
|
}
|
||
|
|
|
||
|
|
s.policyMu.Lock()
|
||
|
|
s.policies = next
|
||
|
|
s.policyMu.Unlock()
|
||
|
|
return nil
|
||
|
|
}
|
||
|
|
|
||
|
|
func (s *Server) persistPolicies(ctx context.Context, policies []api.BackupPolicy) error {
|
||
|
|
doc := struct {
|
||
|
|
Policies []api.BackupPolicy `json:"policies"`
|
||
|
|
}{
|
||
|
|
Policies: policies,
|
||
|
|
}
|
||
|
|
payload, err := json.Marshal(doc)
|
||
|
|
if err != nil {
|
||
|
|
return fmt.Errorf("encode policy document: %w", err)
|
||
|
|
}
|
||
|
|
return s.client.SaveSecretData(ctx, s.cfg.Namespace, s.cfg.PolicySecretName, policySecretKey, payload, map[string]string{
|
||
|
|
"app.kubernetes.io/name": "soteria",
|
||
|
|
"app.kubernetes.io/component": "policy-store",
|
||
|
|
})
|
||
|
|
}
|
||
|
|
|
||
|
|
func (s *Server) listPolicies() []api.BackupPolicy {
|
||
|
|
s.policyMu.RLock()
|
||
|
|
defer s.policyMu.RUnlock()
|
||
|
|
policies := make([]api.BackupPolicy, 0, len(s.policies))
|
||
|
|
for _, policy := range s.policies {
|
||
|
|
policies = append(policies, policy)
|
||
|
|
}
|
||
|
|
sort.Slice(policies, func(i, j int) bool {
|
||
|
|
if policies[i].Namespace != policies[j].Namespace {
|
||
|
|
return policies[i].Namespace < policies[j].Namespace
|
||
|
|
}
|
||
|
|
if policies[i].PVC != policies[j].PVC {
|
||
|
|
return policies[i].PVC < policies[j].PVC
|
||
|
|
}
|
||
|
|
return policies[i].ID < policies[j].ID
|
||
|
|
})
|
||
|
|
return policies
|
||
|
|
}
|
||
|
|
|
||
|
|
func (s *Server) activePolicies() []api.BackupPolicy {
|
||
|
|
policies := s.listPolicies()
|
||
|
|
filtered := make([]api.BackupPolicy, 0, len(policies))
|
||
|
|
for _, policy := range policies {
|
||
|
|
if policy.Enabled {
|
||
|
|
filtered = append(filtered, policy)
|
||
|
|
}
|
||
|
|
}
|
||
|
|
return filtered
|
||
|
|
}
|
||
|
|
|
||
|
|
func (s *Server) upsertPolicy(ctx context.Context, req api.BackupPolicyUpsertRequest) (api.BackupPolicy, error) {
|
||
|
|
namespace := strings.TrimSpace(req.Namespace)
|
||
|
|
pvc := strings.TrimSpace(req.PVC)
|
||
|
|
if namespace == "" {
|
||
|
|
return api.BackupPolicy{}, fmt.Errorf("namespace is required")
|
||
|
|
}
|
||
|
|
if err := validateKubernetesName("namespace", namespace); err != nil {
|
||
|
|
return api.BackupPolicy{}, err
|
||
|
|
}
|
||
|
|
if pvc != "" {
|
||
|
|
if err := validateKubernetesName("pvc", pvc); err != nil {
|
||
|
|
return api.BackupPolicy{}, err
|
||
|
|
}
|
||
|
|
}
|
||
|
|
|
||
|
|
interval := req.IntervalHours
|
||
|
|
if interval <= 0 {
|
||
|
|
interval = defaultPolicyHours
|
||
|
|
}
|
||
|
|
if interval > maxPolicyIntervalHrs {
|
||
|
|
return api.BackupPolicy{}, fmt.Errorf("interval_hours must be <= %d", maxPolicyIntervalHrs)
|
||
|
|
}
|
||
|
|
enabled := true
|
||
|
|
if req.Enabled != nil {
|
||
|
|
enabled = *req.Enabled
|
||
|
|
}
|
||
|
|
dedupe := dedupeDefault(req.Dedupe)
|
||
|
|
if err := validateKeepLast(req.KeepLast); err != nil {
|
||
|
|
return api.BackupPolicy{}, err
|
||
|
|
}
|
||
|
|
keepLast := keepLastDefault(req.KeepLast)
|
||
|
|
|
||
|
|
id := policyKey(namespace, pvc)
|
||
|
|
now := time.Now().UTC().Format(time.RFC3339)
|
||
|
|
|
||
|
|
s.policyMu.Lock()
|
||
|
|
before := clonePolicyMap(s.policies)
|
||
|
|
createdAt := now
|
||
|
|
if existing, ok := s.policies[id]; ok && existing.CreatedAt != "" {
|
||
|
|
createdAt = existing.CreatedAt
|
||
|
|
}
|
||
|
|
policy := api.BackupPolicy{
|
||
|
|
ID: id,
|
||
|
|
Namespace: namespace,
|
||
|
|
PVC: pvc,
|
||
|
|
IntervalHours: interval,
|
||
|
|
Enabled: enabled,
|
||
|
|
Dedupe: dedupe,
|
||
|
|
KeepLast: keepLast,
|
||
|
|
CreatedAt: createdAt,
|
||
|
|
UpdatedAt: now,
|
||
|
|
}
|
||
|
|
s.policies[id] = policy
|
||
|
|
snapshot := policySliceFromMap(s.policies)
|
||
|
|
s.policyMu.Unlock()
|
||
|
|
|
||
|
|
if err := s.persistPolicies(ctx, snapshot); err != nil {
|
||
|
|
s.policyMu.Lock()
|
||
|
|
s.policies = before
|
||
|
|
s.policyMu.Unlock()
|
||
|
|
return api.BackupPolicy{}, err
|
||
|
|
}
|
||
|
|
return policy, nil
|
||
|
|
}
|
||
|
|
|
||
|
|
func (s *Server) deletePolicy(ctx context.Context, id string) (bool, error) {
|
||
|
|
id = strings.TrimSpace(id)
|
||
|
|
if id == "" {
|
||
|
|
return false, nil
|
||
|
|
}
|
||
|
|
|
||
|
|
s.policyMu.Lock()
|
||
|
|
if _, ok := s.policies[id]; !ok {
|
||
|
|
s.policyMu.Unlock()
|
||
|
|
return false, nil
|
||
|
|
}
|
||
|
|
before := clonePolicyMap(s.policies)
|
||
|
|
delete(s.policies, id)
|
||
|
|
snapshot := policySliceFromMap(s.policies)
|
||
|
|
s.policyMu.Unlock()
|
||
|
|
|
||
|
|
if err := s.persistPolicies(ctx, snapshot); err != nil {
|
||
|
|
s.policyMu.Lock()
|
||
|
|
s.policies = before
|
||
|
|
s.policyMu.Unlock()
|
||
|
|
return false, err
|
||
|
|
}
|
||
|
|
return true, nil
|
||
|
|
}
|
||
|
|
|
||
|
|
func policyKey(namespace, pvc string) string {
|
||
|
|
scope := strings.TrimSpace(pvc)
|
||
|
|
if scope == "" {
|
||
|
|
scope = "_all"
|
||
|
|
}
|
||
|
|
return strings.TrimSpace(namespace) + "__" + scope
|
||
|
|
}
|
||
|
|
|
||
|
|
func policySliceFromMap(source map[string]api.BackupPolicy) []api.BackupPolicy {
|
||
|
|
out := make([]api.BackupPolicy, 0, len(source))
|
||
|
|
for _, policy := range source {
|
||
|
|
out = append(out, policy)
|
||
|
|
}
|
||
|
|
sort.Slice(out, func(i, j int) bool {
|
||
|
|
return out[i].ID < out[j].ID
|
||
|
|
})
|
||
|
|
return out
|
||
|
|
}
|
||
|
|
|
||
|
|
func clonePolicyMap(source map[string]api.BackupPolicy) map[string]api.BackupPolicy {
|
||
|
|
cloned := make(map[string]api.BackupPolicy, len(source))
|
||
|
|
for key, value := range source {
|
||
|
|
cloned[key] = value
|
||
|
|
}
|
||
|
|
return cloned
|
||
|
|
}
|
||
|
|
|
||
|
|
func backupDue(lastBackupAt string, intervalHours float64) bool {
|
||
|
|
if intervalHours <= 0 {
|
||
|
|
intervalHours = defaultPolicyHours
|
||
|
|
}
|
||
|
|
if strings.TrimSpace(lastBackupAt) == "" {
|
||
|
|
return true
|
||
|
|
}
|
||
|
|
timestamp, ok := parseBackupTime(lastBackupAt)
|
||
|
|
if !ok {
|
||
|
|
return true
|
||
|
|
}
|
||
|
|
interval := time.Duration(intervalHours * float64(time.Hour))
|
||
|
|
return time.Since(timestamp) >= interval
|
||
|
|
}
|