package server import ( "context" "encoding/json" "fmt" "log" "math" "net/http" "net/url" "sort" "strconv" "strings" "sync" "time" "scm.bstein.dev/bstein/soteria/internal/api" "scm.bstein.dev/bstein/soteria/internal/config" "scm.bstein.dev/bstein/soteria/internal/k8s" "scm.bstein.dev/bstein/soteria/internal/longhorn" corev1 "k8s.io/api/core/v1" "k8s.io/apimachinery/pkg/api/resource" k8svalidation "k8s.io/apimachinery/pkg/util/validation" ) type kubeClient interface { ResolvePVCVolume(ctx context.Context, namespace, pvcName string) (string, *corev1.PersistentVolumeClaim, *corev1.PersistentVolume, error) CreateBackupJob(ctx context.Context, cfg *config.Config, req api.BackupRequest) (string, string, error) CreateRestoreJob(ctx context.Context, cfg *config.Config, req api.RestoreTestRequest) (string, string, error) ListBackupJobsForPVC(ctx context.Context, namespace, pvc string) ([]k8s.BackupJobSummary, error) ListBoundPVCs(ctx context.Context) ([]k8s.PVCSummary, error) PersistentVolumeClaimExists(ctx context.Context, namespace, pvcName string) (bool, error) LoadSecretData(ctx context.Context, namespace, secretName, key string) ([]byte, error) SaveSecretData(ctx context.Context, namespace, secretName, key string, value []byte, labels map[string]string) error } type longhornClient interface { CreateSnapshot(ctx context.Context, volume, name string, labels map[string]string) error SnapshotBackup(ctx context.Context, volume, name string, labels map[string]string, backupMode string) (*longhorn.Volume, error) GetVolume(ctx context.Context, volume string) (*longhorn.Volume, error) CreateVolumeFromBackup(ctx context.Context, name, size string, replicas int, backupURL string) (*longhorn.Volume, error) CreatePVC(ctx context.Context, volumeName, namespace, pvcName string) error DeleteVolume(ctx context.Context, volumeName string) error FindBackup(ctx context.Context, volumeName, snapshot string) (*longhorn.Backup, error) ListBackups(ctx context.Context, volumeName string) ([]longhorn.Backup, error) } type Server struct { cfg *config.Config client kubeClient longhorn longhornClient metrics *telemetry handler http.Handler ui *uiRenderer policyMu sync.RWMutex policies map[string]api.BackupPolicy runMu sync.Mutex running bool b2Mu sync.RWMutex b2Usage api.B2UsageResponse } type authIdentity struct { Authenticated bool User string Email string Groups []string } type ctxKey string const authContextKey ctxKey = "soteria-auth" const ( policySecretKey = "policies.json" defaultPolicyHours = 24.0 maxPolicyIntervalHrs = 24 * 365 ) func New(cfg *config.Config, client *k8s.Client, lh *longhorn.Client) *Server { s := &Server{ cfg: cfg, client: client, longhorn: lh, metrics: newTelemetry(), ui: newUIRenderer(), policies: map[string]api.BackupPolicy{}, } s.handler = http.HandlerFunc(s.route) return s } func (s *Server) Start(ctx context.Context) { if err := s.loadPolicies(ctx); err != nil { log.Printf("policy load failed: %v", err) } s.refreshTelemetry(ctx) s.refreshB2Usage(ctx) s.runPolicyCycle(ctx) metricsTicker := time.NewTicker(s.cfg.MetricsRefreshInterval) policyTicker := time.NewTicker(s.cfg.PolicyEvalInterval) var b2Ticker *time.Ticker var b2Tick <-chan time.Time if s.cfg.B2Enabled { b2Ticker = time.NewTicker(s.cfg.B2ScanInterval) b2Tick = b2Ticker.C } go func() { defer metricsTicker.Stop() defer policyTicker.Stop() if b2Ticker != nil { defer b2Ticker.Stop() } for { select { case <-ctx.Done(): return case <-metricsTicker.C: s.refreshTelemetry(ctx) case <-policyTicker.C: s.runPolicyCycle(ctx) case <-b2Tick: s.refreshB2Usage(ctx) } } }() } func (s *Server) Handler() http.Handler { return s.handler } func (s *Server) route(w http.ResponseWriter, r *http.Request) { switch r.URL.Path { case "/healthz": s.handleHealth(w, r) return case "/readyz": s.handleReady(w, r) return case "/metrics": s.metrics.Handler().ServeHTTP(w, r) return } identity, status, err := s.authorize(r) if err != nil { s.metrics.RecordAuthzDenied(authzReason(status, err)) writeError(w, status, err.Error()) return } r = r.WithContext(context.WithValue(r.Context(), authContextKey, identity)) switch r.URL.Path { case "/": s.handleUI(w, r) case "/v1/b2": s.handleB2Usage(w, r) case "/v1/whoami": s.handleWhoAmI(w, r) case "/v1/inventory": s.handleInventory(w, r) case "/v1/backups": s.handleBackups(w, r) case "/v1/backup": s.handleBackup(w, r) case "/v1/backup/namespace": s.handleNamespaceBackup(w, r) case "/v1/restores", "/v1/restore-test": s.handleRestore(w, r) case "/v1/restores/namespace": s.handleNamespaceRestore(w, r) case "/v1/policies": s.handlePolicies(w, r) default: if s.ui != nil && s.ui.ServeAsset(w, r) { return } if strings.HasPrefix(r.URL.Path, "/v1/policies/") { s.handlePolicyByID(w, r) return } // Serve SPA index for deep links (for example /backup) while preserving // explicit API and asset 404 behavior. if r.Method == http.MethodGet && !strings.HasPrefix(r.URL.Path, "/v1/") && !strings.Contains(r.URL.Path, ".") { s.handleUI(w, r) return } writeError(w, http.StatusNotFound, "not found") } } func (s *Server) handleHealth(w http.ResponseWriter, _ *http.Request) { writeJSON(w, http.StatusOK, map[string]string{"status": "ok"}) } func (s *Server) handleReady(w http.ResponseWriter, _ *http.Request) { writeJSON(w, http.StatusOK, map[string]string{"status": "ready"}) } func (s *Server) handleUI(w http.ResponseWriter, r *http.Request) { if r.Method != http.MethodGet { writeError(w, http.StatusMethodNotAllowed, "method not allowed") return } if s.ui == nil { writeError(w, http.StatusInternalServerError, "UI renderer is unavailable") return } if err := s.ui.ServeIndex(w, r); err != nil { writeError(w, http.StatusInternalServerError, err.Error()) } } func (s *Server) handleWhoAmI(w http.ResponseWriter, r *http.Request) { if r.Method != http.MethodGet { writeError(w, http.StatusMethodNotAllowed, "method not allowed") return } identity := requesterFromContext(r.Context()) writeJSON(w, http.StatusOK, api.AuthInfoResponse{ Authenticated: identity.Authenticated, User: identity.User, Email: identity.Email, Groups: identity.Groups, }) } func (s *Server) handleInventory(w http.ResponseWriter, r *http.Request) { if r.Method != http.MethodGet { writeError(w, http.StatusMethodNotAllowed, "method not allowed") return } inventory, err := s.buildInventory(r.Context()) if err != nil { writeError(w, http.StatusBadGateway, err.Error()) return } writeJSON(w, http.StatusOK, inventory) } func (s *Server) handleBackups(w http.ResponseWriter, r *http.Request) { if r.Method != http.MethodGet { writeError(w, http.StatusMethodNotAllowed, "method not allowed") return } namespace := strings.TrimSpace(r.URL.Query().Get("namespace")) pvcName := strings.TrimSpace(r.URL.Query().Get("pvc")) if namespace == "" || pvcName == "" { writeError(w, http.StatusBadRequest, "namespace and pvc are required") return } volumeName, _, _, err := s.client.ResolvePVCVolume(r.Context(), namespace, pvcName) if err != nil { writeError(w, http.StatusBadRequest, err.Error()) return } switch s.cfg.BackupDriver { case "longhorn": backups, err := s.longhorn.ListBackups(r.Context(), volumeName) if err != nil { writeError(w, http.StatusBadGateway, err.Error()) return } writeJSON(w, http.StatusOK, api.BackupListResponse{ Namespace: namespace, PVC: pvcName, Volume: volumeName, Backups: buildBackupRecords(backups), }) case "restic": jobs, err := s.client.ListBackupJobsForPVC(r.Context(), namespace, pvcName) if err != nil { writeError(w, http.StatusBadGateway, err.Error()) return } writeJSON(w, http.StatusOK, api.BackupListResponse{ Namespace: namespace, PVC: pvcName, Volume: volumeName, Backups: buildResticBackupRecords(jobs), }) default: writeError(w, http.StatusBadRequest, "unsupported backup driver") } } func (s *Server) handleBackup(w http.ResponseWriter, r *http.Request) { if r.Method != http.MethodPost { writeError(w, http.StatusMethodNotAllowed, "method not allowed") return } r.Body = http.MaxBytesReader(w, r.Body, 1<<20) var req api.BackupRequest if err := json.NewDecoder(r.Body).Decode(&req); err != nil { s.metrics.RecordBackupRequest(s.cfg.BackupDriver, "invalid_json") writeError(w, http.StatusBadRequest, fmt.Sprintf("invalid JSON: %v", err)) return } if strings.TrimSpace(req.Namespace) == "" || strings.TrimSpace(req.PVC) == "" { s.metrics.RecordBackupRequest(s.cfg.BackupDriver, "validation_error") writeError(w, http.StatusBadRequest, "namespace and pvc are required") return } req.Namespace = strings.TrimSpace(req.Namespace) req.PVC = strings.TrimSpace(req.PVC) requester := currentRequester(r.Context()) response, result, err := s.executeBackup(r.Context(), req, requester) s.metrics.RecordBackupRequest(s.cfg.BackupDriver, result) if err != nil { writeError(w, backupStatusCode(result), err.Error()) return } writeJSON(w, http.StatusOK, response) } func (s *Server) handleRestore(w http.ResponseWriter, r *http.Request) { if r.Method != http.MethodPost { writeError(w, http.StatusMethodNotAllowed, "method not allowed") return } r.Body = http.MaxBytesReader(w, r.Body, 1<<20) var req api.RestoreTestRequest if err := json.NewDecoder(r.Body).Decode(&req); err != nil { s.metrics.RecordRestoreRequest(s.cfg.BackupDriver, "invalid_json") writeError(w, http.StatusBadRequest, fmt.Sprintf("invalid JSON: %v", err)) return } req.Namespace = strings.TrimSpace(req.Namespace) req.PVC = strings.TrimSpace(req.PVC) req.TargetPVC = strings.TrimSpace(req.TargetPVC) req.TargetNamespace = strings.TrimSpace(req.TargetNamespace) if req.Namespace == "" { s.metrics.RecordRestoreRequest(s.cfg.BackupDriver, "validation_error") writeError(w, http.StatusBadRequest, "namespace is required") return } if req.PVC == "" { s.metrics.RecordRestoreRequest(s.cfg.BackupDriver, "validation_error") writeError(w, http.StatusBadRequest, "pvc is required") return } if req.TargetPVC == "" { s.metrics.RecordRestoreRequest(s.cfg.BackupDriver, "validation_error") writeError(w, http.StatusBadRequest, "target_pvc is required") return } if req.TargetNamespace == "" { req.TargetNamespace = req.Namespace } if err := validateKubernetesName("namespace", req.Namespace); err != nil { s.metrics.RecordRestoreRequest(s.cfg.BackupDriver, "validation_error") writeError(w, http.StatusBadRequest, err.Error()) return } if err := validateKubernetesName("pvc", req.PVC); err != nil { s.metrics.RecordRestoreRequest(s.cfg.BackupDriver, "validation_error") writeError(w, http.StatusBadRequest, err.Error()) return } if err := validateKubernetesName("target_namespace", req.TargetNamespace); err != nil { s.metrics.RecordRestoreRequest(s.cfg.BackupDriver, "validation_error") writeError(w, http.StatusBadRequest, err.Error()) return } if err := validateKubernetesName("target_pvc", req.TargetPVC); err != nil { s.metrics.RecordRestoreRequest(s.cfg.BackupDriver, "validation_error") writeError(w, http.StatusBadRequest, err.Error()) return } if req.Namespace == req.TargetNamespace && req.PVC == req.TargetPVC { s.metrics.RecordRestoreRequest(s.cfg.BackupDriver, "conflict") writeError(w, http.StatusConflict, "target namespace/pvc must differ from source") return } requester := currentRequester(r.Context()) response, result, err := s.executeRestore(r.Context(), req, requester) s.metrics.RecordRestoreRequest(s.cfg.BackupDriver, result) if err != nil { writeError(w, restoreStatusCode(result), err.Error()) return } writeJSON(w, http.StatusOK, response) } func (s *Server) handleNamespaceBackup(w http.ResponseWriter, r *http.Request) { if r.Method != http.MethodPost { writeError(w, http.StatusMethodNotAllowed, "method not allowed") return } r.Body = http.MaxBytesReader(w, r.Body, 1<<20) var req api.NamespaceBackupRequest if err := json.NewDecoder(r.Body).Decode(&req); err != nil { s.metrics.RecordNamespaceBackupRequest(s.cfg.BackupDriver, "invalid_json") writeError(w, http.StatusBadRequest, fmt.Sprintf("invalid JSON: %v", err)) return } req.Namespace = strings.TrimSpace(req.Namespace) if req.Namespace == "" { s.metrics.RecordNamespaceBackupRequest(s.cfg.BackupDriver, "validation_error") writeError(w, http.StatusBadRequest, "namespace is required") return } if err := validateKubernetesName("namespace", req.Namespace); err != nil { s.metrics.RecordNamespaceBackupRequest(s.cfg.BackupDriver, "validation_error") writeError(w, http.StatusBadRequest, err.Error()) return } pvcs, err := s.listNamespaceBoundPVCs(r.Context(), req.Namespace) if err != nil { s.metrics.RecordNamespaceBackupRequest(s.cfg.BackupDriver, "backend_error") writeError(w, http.StatusBadGateway, err.Error()) return } requester := currentRequester(r.Context()) response := api.NamespaceBackupResponse{ Namespace: req.Namespace, RequestedBy: requester, Driver: s.cfg.BackupDriver, DryRun: req.DryRun, Results: make([]api.NamespaceBackupResult, 0, len(pvcs)), } for _, pvc := range pvcs { backupReq := api.BackupRequest{ Namespace: req.Namespace, PVC: pvc.Name, DryRun: req.DryRun, } result, status, execErr := s.executeBackup(r.Context(), backupReq, requester) s.metrics.RecordBackupRequest(s.cfg.BackupDriver, status) item := api.NamespaceBackupResult{ Namespace: req.Namespace, PVC: pvc.Name, Status: status, Volume: result.Volume, Backup: result.Backup, } if execErr != nil { item.Error = execErr.Error() response.Failed++ } else { response.Succeeded++ } response.Results = append(response.Results, item) } response.Total = len(response.Results) s.metrics.RecordNamespaceBackupRequest(s.cfg.BackupDriver, namespaceResultStatus(req.DryRun, response.Total, response.Succeeded, response.Failed)) writeJSON(w, http.StatusOK, response) } func (s *Server) handleNamespaceRestore(w http.ResponseWriter, r *http.Request) { if r.Method != http.MethodPost { writeError(w, http.StatusMethodNotAllowed, "method not allowed") return } r.Body = http.MaxBytesReader(w, r.Body, 1<<20) var req api.NamespaceRestoreRequest if err := json.NewDecoder(r.Body).Decode(&req); err != nil { s.metrics.RecordNamespaceRestoreRequest(s.cfg.BackupDriver, "invalid_json") writeError(w, http.StatusBadRequest, fmt.Sprintf("invalid JSON: %v", err)) return } req.Namespace = strings.TrimSpace(req.Namespace) req.TargetNamespace = strings.TrimSpace(req.TargetNamespace) req.TargetPrefix = strings.TrimSpace(req.TargetPrefix) req.Snapshot = strings.TrimSpace(req.Snapshot) if req.Namespace == "" { s.metrics.RecordNamespaceRestoreRequest(s.cfg.BackupDriver, "validation_error") writeError(w, http.StatusBadRequest, "namespace is required") return } if req.TargetNamespace == "" { req.TargetNamespace = req.Namespace } if err := validateKubernetesName("namespace", req.Namespace); err != nil { s.metrics.RecordNamespaceRestoreRequest(s.cfg.BackupDriver, "validation_error") writeError(w, http.StatusBadRequest, err.Error()) return } if err := validateKubernetesName("target_namespace", req.TargetNamespace); err != nil { s.metrics.RecordNamespaceRestoreRequest(s.cfg.BackupDriver, "validation_error") writeError(w, http.StatusBadRequest, err.Error()) return } pvcs, err := s.listNamespaceBoundPVCs(r.Context(), req.Namespace) if err != nil { s.metrics.RecordNamespaceRestoreRequest(s.cfg.BackupDriver, "backend_error") writeError(w, http.StatusBadGateway, err.Error()) return } requester := currentRequester(r.Context()) response := api.NamespaceRestoreResponse{ Namespace: req.Namespace, TargetNamespace: req.TargetNamespace, RequestedBy: requester, Driver: s.cfg.BackupDriver, DryRun: req.DryRun, Results: make([]api.NamespaceRestoreResult, 0, len(pvcs)), } for _, pvc := range pvcs { targetPVC := targetPVCName(req.TargetPrefix, pvc.Name) restoreReq := api.RestoreTestRequest{ Namespace: req.Namespace, PVC: pvc.Name, Snapshot: req.Snapshot, TargetNamespace: req.TargetNamespace, TargetPVC: targetPVC, DryRun: req.DryRun, } result, status, execErr := s.executeRestore(r.Context(), restoreReq, requester) s.metrics.RecordRestoreRequest(s.cfg.BackupDriver, status) item := api.NamespaceRestoreResult{ Namespace: req.Namespace, PVC: pvc.Name, TargetNamespace: req.TargetNamespace, TargetPVC: targetPVC, Status: status, Volume: result.Volume, BackupURL: result.BackupURL, } if execErr != nil { item.Error = execErr.Error() response.Failed++ } else { response.Succeeded++ } response.Results = append(response.Results, item) } response.Total = len(response.Results) s.metrics.RecordNamespaceRestoreRequest(s.cfg.BackupDriver, namespaceResultStatus(req.DryRun, response.Total, response.Succeeded, response.Failed)) writeJSON(w, http.StatusOK, response) } func (s *Server) handlePolicies(w http.ResponseWriter, r *http.Request) { switch r.Method { case http.MethodGet: writeJSON(w, http.StatusOK, api.BackupPolicyListResponse{Policies: s.listPolicies()}) case http.MethodPost: r.Body = http.MaxBytesReader(w, r.Body, 1<<20) var req api.BackupPolicyUpsertRequest if err := json.NewDecoder(r.Body).Decode(&req); err != nil { writeError(w, http.StatusBadRequest, fmt.Sprintf("invalid JSON: %v", err)) return } policy, err := s.upsertPolicy(r.Context(), req) if err != nil { writeError(w, http.StatusBadRequest, err.Error()) return } writeJSON(w, http.StatusOK, policy) default: writeError(w, http.StatusMethodNotAllowed, "method not allowed") } } func (s *Server) handlePolicyByID(w http.ResponseWriter, r *http.Request) { if r.Method != http.MethodDelete { writeError(w, http.StatusMethodNotAllowed, "method not allowed") return } encodedID := strings.TrimPrefix(r.URL.Path, "/v1/policies/") if encodedID == "" { writeError(w, http.StatusBadRequest, "policy id is required") return } id, err := url.PathUnescape(encodedID) if err != nil { writeError(w, http.StatusBadRequest, "invalid policy id") return } removed, err := s.deletePolicy(r.Context(), id) if err != nil { writeError(w, http.StatusBadGateway, err.Error()) return } if !removed { writeError(w, http.StatusNotFound, "policy not found") return } writeJSON(w, http.StatusOK, map[string]string{"deleted": id}) } func (s *Server) executeBackup(ctx context.Context, req api.BackupRequest, requester string) (api.BackupResponse, string, error) { req.Namespace = strings.TrimSpace(req.Namespace) req.PVC = strings.TrimSpace(req.PVC) if req.Namespace == "" || req.PVC == "" { return api.BackupResponse{}, "validation_error", fmt.Errorf("namespace and pvc are required") } switch s.cfg.BackupDriver { case "longhorn": volumeName, _, _, err := s.client.ResolvePVCVolume(ctx, req.Namespace, req.PVC) if err != nil { return api.BackupResponse{}, "validation_error", err } backupID := backupName("backup", req.Namespace+"-"+req.PVC) response := api.BackupResponse{ Driver: "longhorn", Volume: volumeName, Backup: backupID, Namespace: req.Namespace, RequestedBy: requester, DryRun: req.DryRun, } if req.DryRun { return response, "dry_run", nil } labels := map[string]string{ "soteria.bstein.dev/namespace": req.Namespace, "soteria.bstein.dev/pvc": req.PVC, "soteria.bstein.dev/requested-by": requester, } if err := s.longhorn.CreateSnapshot(ctx, volumeName, backupID, labels); err != nil { return api.BackupResponse{}, "backend_error", err } if _, err := s.longhorn.SnapshotBackup(ctx, volumeName, backupID, labels, s.cfg.LonghornBackupMode); err != nil { return api.BackupResponse{}, "backend_error", err } return response, "success", nil case "restic": jobName, secretName, err := s.client.CreateBackupJob(ctx, s.cfg, req) if err != nil { return api.BackupResponse{}, "backend_error", err } result := "success" if req.DryRun { result = "dry_run" } return api.BackupResponse{ Driver: "restic", JobName: jobName, Namespace: req.Namespace, Secret: secretName, RequestedBy: requester, DryRun: req.DryRun, }, result, nil default: return api.BackupResponse{}, "unsupported_driver", fmt.Errorf("unsupported backup driver") } } func (s *Server) executeRestore(ctx context.Context, req api.RestoreTestRequest, requester string) (api.RestoreTestResponse, string, error) { req.Namespace = strings.TrimSpace(req.Namespace) req.PVC = strings.TrimSpace(req.PVC) req.TargetNamespace = strings.TrimSpace(req.TargetNamespace) req.TargetPVC = strings.TrimSpace(req.TargetPVC) req.BackupURL = strings.TrimSpace(req.BackupURL) req.Snapshot = strings.TrimSpace(req.Snapshot) if req.TargetNamespace == "" { req.TargetNamespace = req.Namespace } switch s.cfg.BackupDriver { case "longhorn": exists, err := s.client.PersistentVolumeClaimExists(ctx, req.TargetNamespace, req.TargetPVC) if err != nil { return api.RestoreTestResponse{}, "validation_error", err } if exists { return api.RestoreTestResponse{}, "conflict", fmt.Errorf("target pvc %s/%s already exists", req.TargetNamespace, req.TargetPVC) } volumeName, _, _, err := s.client.ResolvePVCVolume(ctx, req.Namespace, req.PVC) if err != nil { return api.RestoreTestResponse{}, "validation_error", err } backupURL := req.BackupURL if backupURL == "" { backup, err := s.longhorn.FindBackup(ctx, volumeName, req.Snapshot) if err != nil { return api.RestoreTestResponse{}, "validation_error", err } backupURL = strings.TrimSpace(backup.URL) } if backupURL == "" { return api.RestoreTestResponse{}, "validation_error", fmt.Errorf("backup_url is required") } restoreVolumeName := backupName("restore", req.TargetNamespace+"-"+req.TargetPVC) response := api.RestoreTestResponse{ Driver: "longhorn", Volume: restoreVolumeName, TargetNamespace: req.TargetNamespace, TargetPVC: req.TargetPVC, BackupURL: backupURL, Namespace: req.Namespace, RequestedBy: requester, DryRun: req.DryRun, } if req.DryRun { return response, "dry_run", nil } sourceVolume, err := s.longhorn.GetVolume(ctx, volumeName) if err != nil { return api.RestoreTestResponse{}, "backend_error", err } replicas := sourceVolume.NumberOfReplicas if replicas == 0 { replicas = 2 } if _, err := s.longhorn.CreateVolumeFromBackup(ctx, restoreVolumeName, sourceVolume.Size, replicas, backupURL); err != nil { return api.RestoreTestResponse{}, "backend_error", err } if err := s.longhorn.CreatePVC(ctx, restoreVolumeName, req.TargetNamespace, req.TargetPVC); err != nil { cleanupErr := s.longhorn.DeleteVolume(ctx, restoreVolumeName) if cleanupErr != nil { log.Printf("restore cleanup failed for %s: %v", restoreVolumeName, cleanupErr) return api.RestoreTestResponse{}, "backend_error", fmt.Errorf("create restore pvc: %v (cleanup failed: %v)", err, cleanupErr) } return api.RestoreTestResponse{}, "backend_error", fmt.Errorf("create restore pvc: %v", err) } return response, "success", nil case "restic": jobName, secretName, err := s.client.CreateRestoreJob(ctx, s.cfg, req) if err != nil { return api.RestoreTestResponse{}, "backend_error", err } result := "success" if req.DryRun { result = "dry_run" } return api.RestoreTestResponse{ Driver: "restic", JobName: jobName, Namespace: req.Namespace, TargetNamespace: req.TargetNamespace, TargetPVC: req.TargetPVC, Secret: secretName, RequestedBy: requester, DryRun: req.DryRun, }, result, nil default: return api.RestoreTestResponse{}, "unsupported_driver", fmt.Errorf("unsupported backup driver") } } func (s *Server) listNamespaceBoundPVCs(ctx context.Context, namespace string) ([]k8s.PVCSummary, error) { items, err := s.client.ListBoundPVCs(ctx) if err != nil { return nil, err } filtered := make([]k8s.PVCSummary, 0, len(items)) for _, item := range items { if item.Namespace == namespace { filtered = append(filtered, item) } } return filtered, nil } func backupStatusCode(result string) int { switch result { case "validation_error", "unsupported_driver": return http.StatusBadRequest case "backend_error": return http.StatusBadGateway default: return http.StatusInternalServerError } } func restoreStatusCode(result string) int { switch result { case "validation_error", "unsupported_driver": return http.StatusBadRequest case "conflict": return http.StatusConflict case "backend_error": return http.StatusBadGateway default: return http.StatusInternalServerError } } func namespaceResultStatus(dryRun bool, total, succeeded, failed int) string { if dryRun { return "dry_run" } if total == 0 { return "empty" } if failed == 0 { return "success" } if succeeded == 0 { return "failed" } return "partial" } func targetPVCName(prefix, sourcePVC string) string { prefix = sanitizeName(prefix) if prefix == "" { prefix = "restore" } if !strings.HasSuffix(prefix, "-") { prefix += "-" } name := sanitizeName(prefix + sourcePVC) if name == "" { name = "restore" } if len(name) > 63 { name = strings.Trim(name[:63], "-") } if name == "" { name = "restore" } return name } func (s *Server) buildInventory(ctx context.Context) (api.InventoryResponse, error) { pvcs, err := s.client.ListBoundPVCs(ctx) if err != nil { return api.InventoryResponse{}, err } groups := make(map[string][]api.PVCInventory) for _, summary := range pvcs { entry := api.PVCInventory{ Namespace: summary.Namespace, PVC: summary.Name, Volume: summary.VolumeName, Phase: summary.Phase, StorageClass: summary.StorageClass, Capacity: summary.Capacity, AccessModes: summary.AccessModes, Driver: s.cfg.BackupDriver, } s.enrichPVCInventory(ctx, &entry) groups[summary.Namespace] = append(groups[summary.Namespace], entry) } namespaceNames := make([]string, 0, len(groups)) for namespace := range groups { namespaceNames = append(namespaceNames, namespace) } sort.Strings(namespaceNames) response := api.InventoryResponse{ GeneratedAt: time.Now().UTC().Format(time.RFC3339), Namespaces: make([]api.NamespaceInventory, 0, len(namespaceNames)), } for _, namespace := range namespaceNames { response.Namespaces = append(response.Namespaces, api.NamespaceInventory{ Name: namespace, PVCs: groups[namespace], }) } return response, nil } func (s *Server) enrichPVCInventory(ctx context.Context, entry *api.PVCInventory) { switch s.cfg.BackupDriver { case "longhorn": backups, err := s.longhorn.ListBackups(ctx, entry.Volume) if err != nil { entry.Healthy = false entry.HealthReason = "lookup_failed" entry.Error = err.Error() return } entry.BackupCount = len(backups) totalBackupSize := int64(0) completedBackups := 0 for _, backup := range backups { totalBackupSize += parseSizeBytes(backup.Size) if strings.EqualFold(backup.State, "Completed") { completedBackups++ } } entry.CompletedBackups = completedBackups entry.TotalBackupSizeBytes = float64(totalBackupSize) latest, latestTime, ok := latestCompletedBackup(backups) if !ok { entry.Healthy = false if len(backups) == 0 { entry.HealthReason = "missing" } else { entry.HealthReason = "no_completed" } return } entry.LastBackupAt = latest.Created entry.LastBackupSizeBytes = float64(parseSizeBytes(latest.Size)) if latestTime.IsZero() { entry.Healthy = false entry.HealthReason = "unknown_timestamp" return } entry.LastBackupAgeHours = roundHours(time.Since(latestTime).Hours()) entry.Healthy = time.Since(latestTime) <= s.cfg.BackupMaxAge if entry.Healthy { entry.HealthReason = "fresh" } else { entry.HealthReason = "stale" } case "restic": jobs, err := s.client.ListBackupJobsForPVC(ctx, entry.Namespace, entry.PVC) if err != nil { entry.Healthy = false entry.HealthReason = "lookup_failed" entry.Error = err.Error() return } entry.BackupCount = len(jobs) completed := make([]k8s.BackupJobSummary, 0, len(jobs)) for _, job := range jobs { if strings.EqualFold(job.State, "Completed") { completed = append(completed, job) } } entry.CompletedBackups = len(completed) if len(completed) == 0 { entry.Healthy = false if len(jobs) == 0 { entry.HealthReason = "missing" } else { entry.HealthReason = "no_completed" } return } latestTime := backupJobTimestamp(completed[0]) if latestTime.IsZero() { entry.Healthy = false entry.HealthReason = "unknown_timestamp" return } entry.LastBackupAt = latestTime.UTC().Format(time.RFC3339) entry.LastBackupAgeHours = roundHours(time.Since(latestTime).Hours()) entry.Healthy = time.Since(latestTime) <= s.cfg.BackupMaxAge if entry.Healthy { entry.HealthReason = "fresh" } else { entry.HealthReason = "stale" } default: entry.Healthy = false entry.HealthReason = "unsupported_driver" } } func (s *Server) refreshTelemetry(ctx context.Context) { refreshCtx, cancel := context.WithTimeout(ctx, 2*time.Minute) defer cancel() inventory, err := s.buildInventory(refreshCtx) if err != nil { log.Printf("inventory refresh failed: %v", err) s.metrics.RecordInventoryFailure() return } s.metrics.RecordInventory(inventory) } func (s *Server) runPolicyCycle(ctx context.Context) { if !s.beginRun() { return } defer s.endRun() policies := s.activePolicies() if len(policies) == 0 { return } runCtx, cancel := context.WithTimeout(ctx, 3*time.Minute) defer cancel() inventory, err := s.buildInventory(runCtx) if err != nil { log.Printf("policy cycle inventory failed: %v", err) s.metrics.RecordPolicyBackup("inventory_error") return } pvcMap := make(map[string]api.PVCInventory) namespaceMap := make(map[string][]api.PVCInventory) for _, group := range inventory.Namespaces { for _, pvc := range group.PVCs { key := pvc.Namespace + "/" + pvc.PVC pvcMap[key] = pvc namespaceMap[pvc.Namespace] = append(namespaceMap[pvc.Namespace], pvc) } } effectiveIntervals := map[string]float64{} for _, policy := range policies { matches := []api.PVCInventory{} if policy.PVC != "" { if pvc, ok := pvcMap[policy.Namespace+"/"+policy.PVC]; ok { matches = append(matches, pvc) } } else { matches = append(matches, namespaceMap[policy.Namespace]...) } for _, pvc := range matches { key := pvc.Namespace + "/" + pvc.PVC current, exists := effectiveIntervals[key] if !exists || policy.IntervalHours < current { effectiveIntervals[key] = policy.IntervalHours } } } for key, intervalHours := range effectiveIntervals { pvc, ok := pvcMap[key] if !ok { continue } if !backupDue(pvc.LastBackupAt, intervalHours) { s.metrics.RecordPolicyBackup("not_due") continue } _, result, err := s.executeBackup(runCtx, api.BackupRequest{ Namespace: pvc.Namespace, PVC: pvc.PVC, DryRun: false, }, "policy-scheduler") s.metrics.RecordBackupRequest(s.cfg.BackupDriver, result) if err != nil { s.metrics.RecordPolicyBackup(result) log.Printf("policy backup failed for %s/%s: %v", pvc.Namespace, pvc.PVC, err) continue } s.metrics.RecordPolicyBackup("success") } } func (s *Server) beginRun() bool { s.runMu.Lock() defer s.runMu.Unlock() if s.running { return false } s.running = true return true } func (s *Server) endRun() { s.runMu.Lock() defer s.runMu.Unlock() s.running = false } func (s *Server) loadPolicies(ctx context.Context) error { raw, err := s.client.LoadSecretData(ctx, s.cfg.Namespace, s.cfg.PolicySecretName, policySecretKey) if err != nil { return err } if len(raw) == 0 { return nil } var doc struct { Policies []api.BackupPolicy `json:"policies"` } if err := json.Unmarshal(raw, &doc); err != nil { return fmt.Errorf("decode policy document: %w", err) } next := map[string]api.BackupPolicy{} now := time.Now().UTC().Format(time.RFC3339) for _, policy := range doc.Policies { namespace := strings.TrimSpace(policy.Namespace) pvc := strings.TrimSpace(policy.PVC) if namespace == "" { continue } interval := policy.IntervalHours if interval <= 0 { interval = defaultPolicyHours } id := policyKey(namespace, pvc) createdAt := policy.CreatedAt if createdAt == "" { createdAt = now } updatedAt := policy.UpdatedAt if updatedAt == "" { updatedAt = createdAt } next[id] = api.BackupPolicy{ ID: id, Namespace: namespace, PVC: pvc, IntervalHours: interval, Enabled: policy.Enabled, CreatedAt: createdAt, UpdatedAt: updatedAt, } } s.policyMu.Lock() s.policies = next s.policyMu.Unlock() return nil } func (s *Server) persistPolicies(ctx context.Context, policies []api.BackupPolicy) error { doc := struct { Policies []api.BackupPolicy `json:"policies"` }{ Policies: policies, } payload, err := json.Marshal(doc) if err != nil { return fmt.Errorf("encode policy document: %w", err) } return s.client.SaveSecretData(ctx, s.cfg.Namespace, s.cfg.PolicySecretName, policySecretKey, payload, map[string]string{ "app.kubernetes.io/name": "soteria", "app.kubernetes.io/component": "policy-store", }) } func (s *Server) listPolicies() []api.BackupPolicy { s.policyMu.RLock() defer s.policyMu.RUnlock() policies := make([]api.BackupPolicy, 0, len(s.policies)) for _, policy := range s.policies { policies = append(policies, policy) } sort.Slice(policies, func(i, j int) bool { if policies[i].Namespace != policies[j].Namespace { return policies[i].Namespace < policies[j].Namespace } if policies[i].PVC != policies[j].PVC { return policies[i].PVC < policies[j].PVC } return policies[i].ID < policies[j].ID }) return policies } func (s *Server) activePolicies() []api.BackupPolicy { policies := s.listPolicies() filtered := make([]api.BackupPolicy, 0, len(policies)) for _, policy := range policies { if policy.Enabled { filtered = append(filtered, policy) } } return filtered } func (s *Server) upsertPolicy(ctx context.Context, req api.BackupPolicyUpsertRequest) (api.BackupPolicy, error) { namespace := strings.TrimSpace(req.Namespace) pvc := strings.TrimSpace(req.PVC) if namespace == "" { return api.BackupPolicy{}, fmt.Errorf("namespace is required") } if err := validateKubernetesName("namespace", namespace); err != nil { return api.BackupPolicy{}, err } if pvc != "" { if err := validateKubernetesName("pvc", pvc); err != nil { return api.BackupPolicy{}, err } } interval := req.IntervalHours if interval <= 0 { interval = defaultPolicyHours } if interval > maxPolicyIntervalHrs { return api.BackupPolicy{}, fmt.Errorf("interval_hours must be <= %d", maxPolicyIntervalHrs) } enabled := true if req.Enabled != nil { enabled = *req.Enabled } id := policyKey(namespace, pvc) now := time.Now().UTC().Format(time.RFC3339) s.policyMu.Lock() before := clonePolicyMap(s.policies) createdAt := now if existing, ok := s.policies[id]; ok && existing.CreatedAt != "" { createdAt = existing.CreatedAt } policy := api.BackupPolicy{ ID: id, Namespace: namespace, PVC: pvc, IntervalHours: interval, Enabled: enabled, CreatedAt: createdAt, UpdatedAt: now, } s.policies[id] = policy snapshot := policySliceFromMap(s.policies) s.policyMu.Unlock() if err := s.persistPolicies(ctx, snapshot); err != nil { s.policyMu.Lock() s.policies = before s.policyMu.Unlock() return api.BackupPolicy{}, err } return policy, nil } func (s *Server) deletePolicy(ctx context.Context, id string) (bool, error) { id = strings.TrimSpace(id) if id == "" { return false, nil } s.policyMu.Lock() if _, ok := s.policies[id]; !ok { s.policyMu.Unlock() return false, nil } before := clonePolicyMap(s.policies) delete(s.policies, id) snapshot := policySliceFromMap(s.policies) s.policyMu.Unlock() if err := s.persistPolicies(ctx, snapshot); err != nil { s.policyMu.Lock() s.policies = before s.policyMu.Unlock() return false, err } return true, nil } func policyKey(namespace, pvc string) string { scope := strings.TrimSpace(pvc) if scope == "" { scope = "_all" } return strings.TrimSpace(namespace) + "__" + scope } func policySliceFromMap(source map[string]api.BackupPolicy) []api.BackupPolicy { out := make([]api.BackupPolicy, 0, len(source)) for _, policy := range source { out = append(out, policy) } sort.Slice(out, func(i, j int) bool { return out[i].ID < out[j].ID }) return out } func clonePolicyMap(source map[string]api.BackupPolicy) map[string]api.BackupPolicy { cloned := make(map[string]api.BackupPolicy, len(source)) for key, value := range source { cloned[key] = value } return cloned } func backupDue(lastBackupAt string, intervalHours float64) bool { if intervalHours <= 0 { intervalHours = defaultPolicyHours } if strings.TrimSpace(lastBackupAt) == "" { return true } timestamp, ok := parseBackupTime(lastBackupAt) if !ok { return true } interval := time.Duration(intervalHours * float64(time.Hour)) return time.Since(timestamp) >= interval } func (s *Server) authorize(r *http.Request) (authIdentity, int, error) { if !s.cfg.AuthRequired { return authIdentity{}, http.StatusOK, nil } authorization := strings.TrimSpace(r.Header.Get("Authorization")) if strings.HasPrefix(strings.ToLower(authorization), "bearer ") { token := strings.TrimSpace(authorization[7:]) for _, expected := range s.cfg.AuthBearerTokens { if token != "" && token == expected { return authIdentity{Authenticated: true, User: "service-token", Groups: []string{"service-token"}}, http.StatusOK, nil } } } identity := authIdentity{ Authenticated: true, User: firstHeader(r, "X-Auth-Request-User", "X-Forwarded-User"), Email: firstHeader(r, "X-Auth-Request-Email", "X-Forwarded-Email"), Groups: normalizeGroups(splitGroups(firstHeader(r, "X-Auth-Request-Groups", "X-Forwarded-Groups"))), } if identity.User == "" && identity.Email == "" { return authIdentity{}, http.StatusUnauthorized, fmt.Errorf("authentication required") } if len(s.cfg.AllowedGroups) == 0 { return identity, http.StatusOK, nil } if hasAllowedGroup(identity.Groups, s.cfg.AllowedGroups) { return identity, http.StatusOK, nil } return authIdentity{}, http.StatusForbidden, fmt.Errorf("access requires one of: %s", strings.Join(s.cfg.AllowedGroups, ", ")) } func requesterFromContext(ctx context.Context) authIdentity { identity, _ := ctx.Value(authContextKey).(authIdentity) return identity } func currentRequester(ctx context.Context) string { identity := requesterFromContext(ctx) if identity.User != "" { return identity.User } if identity.Email != "" { return identity.Email } if identity.Authenticated { return "authenticated" } return "anonymous" } func authzReason(status int, err error) string { if err == nil { return "unknown" } switch status { case http.StatusUnauthorized: return "unauthenticated" case http.StatusForbidden: return "forbidden_group" default: return "error" } } func hasAllowedGroup(actual, allowed []string) bool { allowedSet := make(map[string]struct{}, len(allowed)) for _, group := range normalizeGroups(allowed) { allowedSet[group] = struct{}{} } for _, group := range normalizeGroups(actual) { if _, ok := allowedSet[group]; ok { return true } } return false } func normalizeGroups(values []string) []string { groups := make([]string, 0, len(values)) for _, value := range values { value = strings.TrimSpace(strings.TrimPrefix(value, "/")) if value == "" { continue } groups = append(groups, value) } return groups } func firstHeader(r *http.Request, names ...string) string { for _, name := range names { value := strings.TrimSpace(r.Header.Get(name)) if value != "" { return value } } return "" } func splitGroups(raw string) []string { raw = strings.TrimSpace(raw) if raw == "" { return nil } return strings.FieldsFunc(raw, func(r rune) bool { return r == ',' || r == ';' }) } func validateKubernetesName(field, value string) error { if errs := k8svalidation.IsDNS1123Label(value); len(errs) > 0 { return fmt.Errorf("%s must be a valid Kubernetes DNS-1123 label", field) } return nil } func buildBackupRecords(backups []longhorn.Backup) []api.BackupRecord { records := make([]api.BackupRecord, 0, len(backups)) latestName := "" if latest, _, ok := latestCompletedBackup(backups); ok { latestName = latest.Name } sort.Slice(backups, func(i, j int) bool { left, lok := parseBackupTime(backups[i].Created) right, rok := parseBackupTime(backups[j].Created) switch { case lok && rok: return left.After(right) case lok: return true case rok: return false default: return backups[i].Name > backups[j].Name } }) for _, backup := range backups { records = append(records, api.BackupRecord{ Name: backup.Name, SnapshotName: backup.SnapshotName, Created: backup.Created, State: backup.State, URL: backup.URL, Size: backup.Size, Latest: backup.Name == latestName, }) } return records } func buildResticBackupRecords(jobs []k8s.BackupJobSummary) []api.BackupRecord { records := make([]api.BackupRecord, 0, len(jobs)) latestName := "" for _, job := range jobs { if strings.EqualFold(job.State, "Completed") { latestName = job.Name break } } for _, job := range jobs { created := "" if ts := backupJobTimestamp(job); !ts.IsZero() { created = ts.UTC().Format(time.RFC3339) } url := "" latest := job.Name == latestName if latest && strings.EqualFold(job.State, "Completed") { // The restore API defaults to "latest"; expose one selectable option in the UI. url = "latest" } records = append(records, api.BackupRecord{ Name: job.Name, SnapshotName: job.Name, Created: created, State: job.State, URL: url, Latest: latest, }) } return records } func backupJobTimestamp(job k8s.BackupJobSummary) time.Time { if !job.CompletionTime.IsZero() { return job.CompletionTime } return job.CreatedAt } func latestCompletedBackup(backups []longhorn.Backup) (longhorn.Backup, time.Time, bool) { var selected longhorn.Backup var selectedTime time.Time found := false for _, backup := range backups { if backup.State != "Completed" { continue } createdAt, ok := parseBackupTime(backup.Created) if !ok { if !found { selected = backup found = true } continue } if !found || createdAt.After(selectedTime) { selected = backup selectedTime = createdAt found = true } } return selected, selectedTime, found } func parseBackupTime(raw string) (time.Time, bool) { layouts := []string{time.RFC3339Nano, time.RFC3339} for _, layout := range layouts { parsed, err := time.Parse(layout, raw) if err == nil { return parsed, true } } return time.Time{}, false } func writeJSON(w http.ResponseWriter, status int, payload any) { w.Header().Set("Content-Type", "application/json") w.WriteHeader(status) _ = json.NewEncoder(w).Encode(payload) } func writeError(w http.ResponseWriter, status int, message string) { writeJSON(w, status, map[string]string{"error": message}) } func backupName(prefix, value string) string { base := sanitizeName(fmt.Sprintf("soteria-%s-%s", prefix, value)) timestamp := time.Now().UTC().Format("20060102-150405") name := fmt.Sprintf("%s-%s", base, timestamp) if len(name) <= 63 { return name } maxBase := 63 - len(timestamp) - 1 if maxBase < 1 { maxBase = 1 } if len(base) > maxBase { base = base[:maxBase] } return fmt.Sprintf("%s-%s", base, timestamp) } func sanitizeName(value string) string { value = strings.ToLower(value) value = strings.ReplaceAll(value, "_", "-") value = strings.ReplaceAll(value, ".", "-") value = strings.ReplaceAll(value, " ", "-") value = strings.Trim(value, "-") return value } func roundHours(value float64) float64 { return math.Round(value*100) / 100 } func parseSizeBytes(raw string) int64 { raw = strings.TrimSpace(raw) if raw == "" { return 0 } if value, err := strconv.ParseInt(raw, 10, 64); err == nil { return value } if value, err := strconv.ParseFloat(raw, 64); err == nil { if value < 0 { return 0 } return int64(value) } if quantity, err := resource.ParseQuantity(raw); err == nil { return quantity.Value() } return 0 }