soteria/internal/server/policy_runtime.go

380 lines
9.6 KiB
Go
Raw Permalink Normal View History

package server
import (
"context"
"encoding/json"
"fmt"
"log"
"sort"
"strings"
"time"
"scm.bstein.dev/bstein/soteria/internal/api"
)
func (s *Server) runPolicyCycle(ctx context.Context) {
if !s.beginRun() {
return
}
defer s.endRun()
policies := s.activePolicies()
if len(policies) == 0 {
return
}
runCtx, cancel := context.WithTimeout(ctx, 3*time.Minute)
defer cancel()
inventory, err := s.buildInventory(runCtx)
if err != nil {
log.Printf("policy cycle inventory failed: %v", err)
s.metrics.RecordPolicyBackup("inventory_error")
return
}
pvcMap := make(map[string]api.PVCInventory)
namespaceMap := make(map[string][]api.PVCInventory)
for _, group := range inventory.Namespaces {
for _, pvc := range group.PVCs {
key := pvc.Namespace + "/" + pvc.PVC
pvcMap[key] = pvc
namespaceMap[pvc.Namespace] = append(namespaceMap[pvc.Namespace], pvc)
}
}
type effectivePolicy struct {
IntervalHours float64
Dedupe bool
KeepLast int
}
effectivePolicies := map[string]effectivePolicy{}
for _, policy := range policies {
matches := []api.PVCInventory{}
if policy.PVC != "" {
if pvc, ok := pvcMap[policy.Namespace+"/"+policy.PVC]; ok {
matches = append(matches, pvc)
}
} else {
matches = append(matches, namespaceMap[policy.Namespace]...)
}
for _, pvc := range matches {
key := pvc.Namespace + "/" + pvc.PVC
current, exists := effectivePolicies[key]
if !exists || policy.IntervalHours < current.IntervalHours || (policy.IntervalHours == current.IntervalHours && keepLastStricter(policy.KeepLast, current.KeepLast)) {
effectivePolicies[key] = effectivePolicy{
IntervalHours: policy.IntervalHours,
Dedupe: policy.Dedupe,
KeepLast: policy.KeepLast,
}
}
}
}
for key, effective := range effectivePolicies {
pvc, ok := pvcMap[key]
if !ok {
continue
}
// Never enqueue a new policy backup while one is already active for this PVC.
// This prevents runaway job storms when a backup is stuck Pending/Running.
if pvc.ActiveBackups > 0 {
s.metrics.RecordPolicyBackup("in_progress")
continue
}
lastRunRef := strings.TrimSpace(pvc.LastBackupAt)
if lastRunRef == "" {
// If no successful backup exists yet, fall back to the most recent job start
// so failed attempts are still throttled by interval_hours.
lastRunRef = strings.TrimSpace(pvc.LastJobStartedAt)
}
if !backupDue(lastRunRef, effective.IntervalHours) {
s.metrics.RecordPolicyBackup("not_due")
continue
}
_, result, err := s.executeBackup(runCtx, api.BackupRequest{
Namespace: pvc.Namespace,
PVC: pvc.PVC,
DryRun: false,
Dedupe: boolPtr(effective.Dedupe),
KeepLast: intPtr(effective.KeepLast),
}, "policy-scheduler")
s.metrics.RecordBackupRequest(s.cfg.BackupDriver, result)
if err != nil {
s.metrics.RecordPolicyBackup(result)
log.Printf("policy backup failed for %s/%s: %v", pvc.Namespace, pvc.PVC, err)
continue
}
s.metrics.RecordPolicyBackup("success")
}
}
func (s *Server) beginRun() bool {
s.runMu.Lock()
defer s.runMu.Unlock()
if s.running {
return false
}
s.running = true
return true
}
func (s *Server) endRun() {
s.runMu.Lock()
defer s.runMu.Unlock()
s.running = false
}
func (s *Server) loadPolicies(ctx context.Context) error {
raw, err := s.client.LoadSecretData(ctx, s.cfg.Namespace, s.cfg.PolicySecretName, policySecretKey)
if err != nil {
return err
}
if len(raw) == 0 {
return nil
}
var doc struct {
Policies []struct {
ID string `json:"id"`
Namespace string `json:"namespace"`
PVC string `json:"pvc,omitempty"`
IntervalHours float64 `json:"interval_hours"`
Enabled bool `json:"enabled"`
Dedupe *bool `json:"dedupe,omitempty"`
KeepLast *int `json:"keep_last,omitempty"`
CreatedAt string `json:"created_at,omitempty"`
UpdatedAt string `json:"updated_at,omitempty"`
} `json:"policies"`
}
if err := json.Unmarshal(raw, &doc); err != nil {
return fmt.Errorf("decode policy document: %w", err)
}
next := map[string]api.BackupPolicy{}
now := time.Now().UTC().Format(time.RFC3339)
for _, policy := range doc.Policies {
namespace := strings.TrimSpace(policy.Namespace)
pvc := strings.TrimSpace(policy.PVC)
if namespace == "" {
continue
}
interval := policy.IntervalHours
if interval <= 0 {
interval = defaultPolicyHours
}
dedupe := true
if policy.Dedupe != nil {
dedupe = *policy.Dedupe
}
keepLast := keepLastDefault(policy.KeepLast)
id := policyKey(namespace, pvc)
createdAt := policy.CreatedAt
if createdAt == "" {
createdAt = now
}
updatedAt := policy.UpdatedAt
if updatedAt == "" {
updatedAt = createdAt
}
next[id] = api.BackupPolicy{
ID: id,
Namespace: namespace,
PVC: pvc,
IntervalHours: interval,
Enabled: policy.Enabled,
Dedupe: dedupe,
KeepLast: keepLast,
CreatedAt: createdAt,
UpdatedAt: updatedAt,
}
}
s.policyMu.Lock()
s.policies = next
s.policyMu.Unlock()
return nil
}
func (s *Server) persistPolicies(ctx context.Context, policies []api.BackupPolicy) error {
doc := struct {
Policies []api.BackupPolicy `json:"policies"`
}{
Policies: policies,
}
payload, err := json.Marshal(doc)
if err != nil {
return fmt.Errorf("encode policy document: %w", err)
}
return s.client.SaveSecretData(ctx, s.cfg.Namespace, s.cfg.PolicySecretName, policySecretKey, payload, map[string]string{
"app.kubernetes.io/name": "soteria",
"app.kubernetes.io/component": "policy-store",
})
}
func (s *Server) listPolicies() []api.BackupPolicy {
s.policyMu.RLock()
defer s.policyMu.RUnlock()
policies := make([]api.BackupPolicy, 0, len(s.policies))
for _, policy := range s.policies {
policies = append(policies, policy)
}
sort.Slice(policies, func(i, j int) bool {
if policies[i].Namespace != policies[j].Namespace {
return policies[i].Namespace < policies[j].Namespace
}
if policies[i].PVC != policies[j].PVC {
return policies[i].PVC < policies[j].PVC
}
return policies[i].ID < policies[j].ID
})
return policies
}
func (s *Server) activePolicies() []api.BackupPolicy {
policies := s.listPolicies()
filtered := make([]api.BackupPolicy, 0, len(policies))
for _, policy := range policies {
if policy.Enabled {
filtered = append(filtered, policy)
}
}
return filtered
}
func (s *Server) upsertPolicy(ctx context.Context, req api.BackupPolicyUpsertRequest) (api.BackupPolicy, error) {
namespace := strings.TrimSpace(req.Namespace)
pvc := strings.TrimSpace(req.PVC)
if namespace == "" {
return api.BackupPolicy{}, fmt.Errorf("namespace is required")
}
if err := validateKubernetesName("namespace", namespace); err != nil {
return api.BackupPolicy{}, err
}
if pvc != "" {
if err := validateKubernetesName("pvc", pvc); err != nil {
return api.BackupPolicy{}, err
}
}
interval := req.IntervalHours
if interval <= 0 {
interval = defaultPolicyHours
}
if interval > maxPolicyIntervalHrs {
return api.BackupPolicy{}, fmt.Errorf("interval_hours must be <= %d", maxPolicyIntervalHrs)
}
enabled := true
if req.Enabled != nil {
enabled = *req.Enabled
}
dedupe := dedupeDefault(req.Dedupe)
if err := validateKeepLast(req.KeepLast); err != nil {
return api.BackupPolicy{}, err
}
keepLast := keepLastDefault(req.KeepLast)
id := policyKey(namespace, pvc)
now := time.Now().UTC().Format(time.RFC3339)
s.policyMu.Lock()
before := clonePolicyMap(s.policies)
createdAt := now
if existing, ok := s.policies[id]; ok && existing.CreatedAt != "" {
createdAt = existing.CreatedAt
}
policy := api.BackupPolicy{
ID: id,
Namespace: namespace,
PVC: pvc,
IntervalHours: interval,
Enabled: enabled,
Dedupe: dedupe,
KeepLast: keepLast,
CreatedAt: createdAt,
UpdatedAt: now,
}
s.policies[id] = policy
snapshot := policySliceFromMap(s.policies)
s.policyMu.Unlock()
if err := s.persistPolicies(ctx, snapshot); err != nil {
s.policyMu.Lock()
s.policies = before
s.policyMu.Unlock()
return api.BackupPolicy{}, err
}
return policy, nil
}
func (s *Server) deletePolicy(ctx context.Context, id string) (bool, error) {
id = strings.TrimSpace(id)
if id == "" {
return false, nil
}
s.policyMu.Lock()
if _, ok := s.policies[id]; !ok {
s.policyMu.Unlock()
return false, nil
}
before := clonePolicyMap(s.policies)
delete(s.policies, id)
snapshot := policySliceFromMap(s.policies)
s.policyMu.Unlock()
if err := s.persistPolicies(ctx, snapshot); err != nil {
s.policyMu.Lock()
s.policies = before
s.policyMu.Unlock()
return false, err
}
return true, nil
}
func policyKey(namespace, pvc string) string {
scope := strings.TrimSpace(pvc)
if scope == "" {
scope = "_all"
}
return strings.TrimSpace(namespace) + "__" + scope
}
func policySliceFromMap(source map[string]api.BackupPolicy) []api.BackupPolicy {
out := make([]api.BackupPolicy, 0, len(source))
for _, policy := range source {
out = append(out, policy)
}
sort.Slice(out, func(i, j int) bool {
return out[i].ID < out[j].ID
})
return out
}
func clonePolicyMap(source map[string]api.BackupPolicy) map[string]api.BackupPolicy {
cloned := make(map[string]api.BackupPolicy, len(source))
for key, value := range source {
cloned[key] = value
}
return cloned
}
func backupDue(lastBackupAt string, intervalHours float64) bool {
if intervalHours <= 0 {
intervalHours = defaultPolicyHours
}
if strings.TrimSpace(lastBackupAt) == "" {
return true
}
timestamp, ok := parseBackupTime(lastBackupAt)
if !ok {
return true
}
interval := time.Duration(intervalHours * float64(time.Hour))
return time.Since(timestamp) >= interval
}