From 0932cae7c3e5fb7d3a34f21e6897e3e93ab60b75 Mon Sep 17 00:00:00 2001 From: codex Date: Mon, 20 Apr 2026 17:24:58 -0300 Subject: [PATCH] refactor(server): split API concerns and retire LOC waivers --- internal/server/auth_support.go | 119 ++ internal/server/backup_handlers.go | 303 ++++ internal/server/inventory_builder.go | 266 ++++ internal/server/policy_http_handlers.go | 60 + internal/server/policy_runtime.go | 379 +++++ internal/server/restic_usage_store.go | 259 +++ internal/server/restore_handlers.go | 316 ++++ internal/server/server.go | 1945 ----------------------- internal/server/server_test.go | 146 ++ internal/server/server_utilities.go | 336 ++++ scripts/loc_hygiene_waivers.tsv | 1 - 11 files changed, 2184 insertions(+), 1946 deletions(-) create mode 100644 internal/server/auth_support.go create mode 100644 internal/server/backup_handlers.go create mode 100644 internal/server/inventory_builder.go create mode 100644 internal/server/policy_http_handlers.go create mode 100644 internal/server/policy_runtime.go create mode 100644 internal/server/restic_usage_store.go create mode 100644 internal/server/restore_handlers.go create mode 100644 internal/server/server_utilities.go diff --git a/internal/server/auth_support.go b/internal/server/auth_support.go new file mode 100644 index 0000000..19d53a9 --- /dev/null +++ b/internal/server/auth_support.go @@ -0,0 +1,119 @@ +package server + +import ( + "context" + "fmt" + "net/http" + "strings" +) + +func (s *Server) authorize(r *http.Request) (authIdentity, int, error) { + if !s.cfg.AuthRequired { + return authIdentity{}, http.StatusOK, nil + } + + authorization := strings.TrimSpace(r.Header.Get("Authorization")) + if strings.HasPrefix(strings.ToLower(authorization), "bearer ") { + token := strings.TrimSpace(authorization[7:]) + for _, expected := range s.cfg.AuthBearerTokens { + if token != "" && token == expected { + return authIdentity{Authenticated: true, User: "service-token", Groups: []string{"service-token"}}, http.StatusOK, nil + } + } + } + + identity := authIdentity{ + Authenticated: true, + User: firstHeader(r, "X-Auth-Request-User", "X-Forwarded-User"), + Email: firstHeader(r, "X-Auth-Request-Email", "X-Forwarded-Email"), + Groups: normalizeGroups(splitGroups(firstHeader(r, "X-Auth-Request-Groups", "X-Forwarded-Groups"))), + } + if identity.User == "" && identity.Email == "" { + return authIdentity{}, http.StatusUnauthorized, fmt.Errorf("authentication required") + } + if len(s.cfg.AllowedGroups) == 0 { + return identity, http.StatusOK, nil + } + if hasAllowedGroup(identity.Groups, s.cfg.AllowedGroups) { + return identity, http.StatusOK, nil + } + return authIdentity{}, http.StatusForbidden, fmt.Errorf("access requires one of: %s", strings.Join(s.cfg.AllowedGroups, ", ")) +} + +func requesterFromContext(ctx context.Context) authIdentity { + identity, _ := ctx.Value(authContextKey).(authIdentity) + return identity +} + +func currentRequester(ctx context.Context) string { + identity := requesterFromContext(ctx) + if identity.User != "" { + return identity.User + } + if identity.Email != "" { + return identity.Email + } + if identity.Authenticated { + return "authenticated" + } + return "anonymous" +} + +func authzReason(status int, err error) string { + if err == nil { + return "unknown" + } + switch status { + case http.StatusUnauthorized: + return "unauthenticated" + case http.StatusForbidden: + return "forbidden_group" + default: + return "error" + } +} + +func hasAllowedGroup(actual, allowed []string) bool { + allowedSet := make(map[string]struct{}, len(allowed)) + for _, group := range normalizeGroups(allowed) { + allowedSet[group] = struct{}{} + } + for _, group := range normalizeGroups(actual) { + if _, ok := allowedSet[group]; ok { + return true + } + } + return false +} + +func normalizeGroups(values []string) []string { + groups := make([]string, 0, len(values)) + for _, value := range values { + value = strings.TrimSpace(strings.TrimPrefix(value, "/")) + if value == "" { + continue + } + groups = append(groups, value) + } + return groups +} + +func firstHeader(r *http.Request, names ...string) string { + for _, name := range names { + value := strings.TrimSpace(r.Header.Get(name)) + if value != "" { + return value + } + } + return "" +} + +func splitGroups(raw string) []string { + raw = strings.TrimSpace(raw) + if raw == "" { + return nil + } + return strings.FieldsFunc(raw, func(r rune) bool { + return r == ',' || r == ';' + }) +} diff --git a/internal/server/backup_handlers.go b/internal/server/backup_handlers.go new file mode 100644 index 0000000..4ef4b7b --- /dev/null +++ b/internal/server/backup_handlers.go @@ -0,0 +1,303 @@ +package server + +import ( + "context" + "encoding/json" + "fmt" + "net/http" + "strings" + + "scm.bstein.dev/bstein/soteria/internal/api" + "scm.bstein.dev/bstein/soteria/internal/k8s" +) + +func (s *Server) handleInventory(w http.ResponseWriter, r *http.Request) { + if r.Method != http.MethodGet { + writeError(w, http.StatusMethodNotAllowed, "method not allowed") + return + } + inventory, err := s.buildInventory(r.Context()) + if err != nil { + writeError(w, http.StatusBadGateway, err.Error()) + return + } + writeJSON(w, http.StatusOK, inventory) +} + +func (s *Server) handleBackups(w http.ResponseWriter, r *http.Request) { + if r.Method != http.MethodGet { + writeError(w, http.StatusMethodNotAllowed, "method not allowed") + return + } + + namespace := strings.TrimSpace(r.URL.Query().Get("namespace")) + pvcName := strings.TrimSpace(r.URL.Query().Get("pvc")) + if namespace == "" || pvcName == "" { + writeError(w, http.StatusBadRequest, "namespace and pvc are required") + return + } + + volumeName, _, _, err := s.client.ResolvePVCVolume(r.Context(), namespace, pvcName) + if err != nil { + writeError(w, http.StatusBadRequest, err.Error()) + return + } + + switch s.cfg.BackupDriver { + case "longhorn": + backups, err := s.longhorn.ListBackups(r.Context(), volumeName) + if err != nil { + writeError(w, http.StatusBadGateway, err.Error()) + return + } + writeJSON(w, http.StatusOK, api.BackupListResponse{ + Namespace: namespace, + PVC: pvcName, + Volume: volumeName, + Backups: buildBackupRecords(backups), + }) + case "restic": + jobs, err := s.client.ListBackupJobsForPVC(r.Context(), namespace, pvcName) + if err != nil { + writeError(w, http.StatusBadGateway, err.Error()) + return + } + records := s.buildResticBackupRecords(r.Context(), namespace, jobs, s.cfg.ResticRepository) + writeJSON(w, http.StatusOK, api.BackupListResponse{ + Namespace: namespace, + PVC: pvcName, + Volume: volumeName, + Backups: records, + }) + default: + writeError(w, http.StatusBadRequest, "unsupported backup driver") + } +} + +func (s *Server) handleBackup(w http.ResponseWriter, r *http.Request) { + if r.Method != http.MethodPost { + writeError(w, http.StatusMethodNotAllowed, "method not allowed") + return + } + + r.Body = http.MaxBytesReader(w, r.Body, 1<<20) + var req api.BackupRequest + if err := json.NewDecoder(r.Body).Decode(&req); err != nil { + s.metrics.RecordBackupRequest(s.cfg.BackupDriver, "invalid_json") + writeError(w, http.StatusBadRequest, fmt.Sprintf("invalid JSON: %v", err)) + return + } + if strings.TrimSpace(req.Namespace) == "" || strings.TrimSpace(req.PVC) == "" { + s.metrics.RecordBackupRequest(s.cfg.BackupDriver, "validation_error") + writeError(w, http.StatusBadRequest, "namespace and pvc are required") + return + } + if err := validateKeepLast(req.KeepLast); err != nil { + s.metrics.RecordBackupRequest(s.cfg.BackupDriver, "validation_error") + writeError(w, http.StatusBadRequest, err.Error()) + return + } + req.Namespace = strings.TrimSpace(req.Namespace) + req.PVC = strings.TrimSpace(req.PVC) + requester := currentRequester(r.Context()) + + response, result, err := s.executeBackup(r.Context(), req, requester) + s.metrics.RecordBackupRequest(s.cfg.BackupDriver, result) + if err != nil { + writeError(w, backupStatusCode(result), err.Error()) + return + } + writeJSON(w, http.StatusOK, response) +} + +func (s *Server) handleNamespaceBackup(w http.ResponseWriter, r *http.Request) { + if r.Method != http.MethodPost { + writeError(w, http.StatusMethodNotAllowed, "method not allowed") + return + } + + r.Body = http.MaxBytesReader(w, r.Body, 1<<20) + var req api.NamespaceBackupRequest + if err := json.NewDecoder(r.Body).Decode(&req); err != nil { + s.metrics.RecordNamespaceBackupRequest(s.cfg.BackupDriver, "invalid_json") + writeError(w, http.StatusBadRequest, fmt.Sprintf("invalid JSON: %v", err)) + return + } + req.Namespace = strings.TrimSpace(req.Namespace) + if req.Namespace == "" { + s.metrics.RecordNamespaceBackupRequest(s.cfg.BackupDriver, "validation_error") + writeError(w, http.StatusBadRequest, "namespace is required") + return + } + if err := validateKeepLast(req.KeepLast); err != nil { + s.metrics.RecordNamespaceBackupRequest(s.cfg.BackupDriver, "validation_error") + writeError(w, http.StatusBadRequest, err.Error()) + return + } + if err := validateKubernetesName("namespace", req.Namespace); err != nil { + s.metrics.RecordNamespaceBackupRequest(s.cfg.BackupDriver, "validation_error") + writeError(w, http.StatusBadRequest, err.Error()) + return + } + + pvcs, err := s.listNamespaceBoundPVCs(r.Context(), req.Namespace) + if err != nil { + s.metrics.RecordNamespaceBackupRequest(s.cfg.BackupDriver, "backend_error") + writeError(w, http.StatusBadGateway, err.Error()) + return + } + + requester := currentRequester(r.Context()) + resolvedDedupe := dedupeDefault(req.Dedupe) + resolvedKeepLast := keepLastDefault(req.KeepLast) + response := api.NamespaceBackupResponse{ + Namespace: req.Namespace, + RequestedBy: requester, + Driver: s.cfg.BackupDriver, + DryRun: req.DryRun, + Dedupe: resolvedDedupe, + KeepLast: resolvedKeepLast, + Results: make([]api.NamespaceBackupResult, 0, len(pvcs)), + } + + for _, pvc := range pvcs { + backupReq := api.BackupRequest{ + Namespace: req.Namespace, + PVC: pvc.Name, + DryRun: req.DryRun, + Dedupe: boolPtr(resolvedDedupe), + KeepLast: intPtr(resolvedKeepLast), + } + result, status, execErr := s.executeBackup(r.Context(), backupReq, requester) + s.metrics.RecordBackupRequest(s.cfg.BackupDriver, status) + + item := api.NamespaceBackupResult{ + Namespace: req.Namespace, + PVC: pvc.Name, + Status: status, + Volume: result.Volume, + Backup: result.Backup, + } + if execErr != nil { + item.Error = execErr.Error() + response.Failed++ + } else { + response.Succeeded++ + } + response.Results = append(response.Results, item) + } + + response.Total = len(response.Results) + s.metrics.RecordNamespaceBackupRequest(s.cfg.BackupDriver, namespaceResultStatus(req.DryRun, response.Total, response.Succeeded, response.Failed)) + writeJSON(w, http.StatusOK, response) +} + +func (s *Server) executeBackup(ctx context.Context, req api.BackupRequest, requester string) (api.BackupResponse, string, error) { + req.Namespace = strings.TrimSpace(req.Namespace) + req.PVC = strings.TrimSpace(req.PVC) + if req.Namespace == "" || req.PVC == "" { + return api.BackupResponse{}, "validation_error", fmt.Errorf("namespace and pvc are required") + } + resolvedDedupe := dedupeDefault(req.Dedupe) + resolvedKeepLast := keepLastDefault(req.KeepLast) + req.Dedupe = boolPtr(resolvedDedupe) + req.KeepLast = intPtr(resolvedKeepLast) + + switch s.cfg.BackupDriver { + case "longhorn": + volumeName, _, _, err := s.client.ResolvePVCVolume(ctx, req.Namespace, req.PVC) + if err != nil { + return api.BackupResponse{}, "validation_error", err + } + + backupID := backupName("backup", req.Namespace+"-"+req.PVC) + response := api.BackupResponse{ + Driver: "longhorn", + Volume: volumeName, + Backup: backupID, + Namespace: req.Namespace, + RequestedBy: requester, + DryRun: req.DryRun, + Dedupe: resolvedDedupe, + KeepLast: resolvedKeepLast, + } + if req.DryRun { + return response, "dry_run", nil + } + + labels := map[string]string{ + "soteria.bstein.dev/namespace": req.Namespace, + "soteria.bstein.dev/pvc": req.PVC, + "soteria.bstein.dev/requested-by": requester, + } + if err := s.longhorn.CreateSnapshot(ctx, volumeName, backupID, labels); err != nil { + return api.BackupResponse{}, "backend_error", err + } + if _, err := s.longhorn.SnapshotBackup(ctx, volumeName, backupID, labels, s.cfg.LonghornBackupMode); err != nil { + return api.BackupResponse{}, "backend_error", err + } + return response, "success", nil + case "restic": + jobName, secretName, err := s.client.CreateBackupJob(ctx, s.cfg, req) + if err != nil { + return api.BackupResponse{}, "backend_error", err + } + result := "success" + if req.DryRun { + result = "dry_run" + } + return api.BackupResponse{ + Driver: "restic", + JobName: jobName, + Namespace: req.Namespace, + Secret: secretName, + RequestedBy: requester, + DryRun: req.DryRun, + Dedupe: resolvedDedupe, + KeepLast: resolvedKeepLast, + }, result, nil + default: + return api.BackupResponse{}, "unsupported_driver", fmt.Errorf("unsupported backup driver") + } +} + +func (s *Server) listNamespaceBoundPVCs(ctx context.Context, namespace string) ([]k8s.PVCSummary, error) { + items, err := s.client.ListBoundPVCs(ctx) + if err != nil { + return nil, err + } + filtered := make([]k8s.PVCSummary, 0, len(items)) + for _, item := range items { + if item.Namespace == namespace { + filtered = append(filtered, item) + } + } + return filtered, nil +} + +func backupStatusCode(result string) int { + switch result { + case "validation_error", "unsupported_driver": + return http.StatusBadRequest + case "backend_error": + return http.StatusBadGateway + default: + return http.StatusInternalServerError + } +} + +func namespaceResultStatus(dryRun bool, total, succeeded, failed int) string { + if dryRun { + return "dry_run" + } + if total == 0 { + return "empty" + } + if failed == 0 { + return "success" + } + if succeeded == 0 { + return "failed" + } + return "partial" +} diff --git a/internal/server/inventory_builder.go b/internal/server/inventory_builder.go new file mode 100644 index 0000000..c2371de --- /dev/null +++ b/internal/server/inventory_builder.go @@ -0,0 +1,266 @@ +package server + +import ( + "context" + "log" + "sort" + "strings" + "time" + + "scm.bstein.dev/bstein/soteria/internal/api" + "scm.bstein.dev/bstein/soteria/internal/k8s" +) + +func (s *Server) buildInventory(ctx context.Context) (api.InventoryResponse, error) { + pvcs, err := s.client.ListBoundPVCs(ctx) + if err != nil { + return api.InventoryResponse{}, err + } + + resticJobsByPVC, resticLookupErrors := s.prefetchResticBackupJobs(ctx, pvcs) + + groups := make(map[string][]api.PVCInventory) + for _, summary := range pvcs { + entry := api.PVCInventory{ + Namespace: summary.Namespace, + PVC: summary.Name, + Volume: summary.VolumeName, + Phase: summary.Phase, + StorageClass: summary.StorageClass, + Capacity: summary.Capacity, + AccessModes: summary.AccessModes, + Driver: s.cfg.BackupDriver, + } + s.enrichPVCInventory(ctx, &entry, resticJobsByPVC, resticLookupErrors) + groups[summary.Namespace] = append(groups[summary.Namespace], entry) + } + + namespaceNames := make([]string, 0, len(groups)) + for namespace := range groups { + namespaceNames = append(namespaceNames, namespace) + } + sort.Strings(namespaceNames) + + response := api.InventoryResponse{ + GeneratedAt: time.Now().UTC().Format(time.RFC3339), + Namespaces: make([]api.NamespaceInventory, 0, len(namespaceNames)), + } + for _, namespace := range namespaceNames { + response.Namespaces = append(response.Namespaces, api.NamespaceInventory{ + Name: namespace, + PVCs: groups[namespace], + }) + } + return response, nil +} + +func (s *Server) prefetchResticBackupJobs(ctx context.Context, pvcs []k8s.PVCSummary) (map[string][]k8s.BackupJobSummary, map[string]error) { + if s.cfg.BackupDriver != "restic" { + return nil, nil + } + + namespaces := map[string]struct{}{} + for _, pvc := range pvcs { + namespaces[pvc.Namespace] = struct{}{} + } + + namespaceNames := make([]string, 0, len(namespaces)) + for namespace := range namespaces { + namespaceNames = append(namespaceNames, namespace) + } + sort.Strings(namespaceNames) + + jobsByPVC := map[string][]k8s.BackupJobSummary{} + lookupErrors := map[string]error{} + for _, namespace := range namespaceNames { + jobs, err := s.client.ListBackupJobs(ctx, namespace) + if err != nil { + lookupErrors[namespace] = err + continue + } + for _, job := range jobs { + key := job.Namespace + "/" + job.PVC + jobsByPVC[key] = append(jobsByPVC[key], job) + } + } + + for key := range jobsByPVC { + sortBackupJobsNewestFirst(jobsByPVC[key]) + } + + return jobsByPVC, lookupErrors +} + +func (s *Server) enrichPVCInventory( + ctx context.Context, + entry *api.PVCInventory, + resticJobsByPVC map[string][]k8s.BackupJobSummary, + resticLookupErrors map[string]error, +) { + switch s.cfg.BackupDriver { + case "longhorn": + backups, err := s.longhorn.ListBackups(ctx, entry.Volume) + if err != nil { + entry.Healthy = false + entry.HealthReason = "lookup_failed" + entry.Error = err.Error() + return + } + entry.BackupCount = len(backups) + totalBackupSize := int64(0) + completedBackups := 0 + for _, backup := range backups { + totalBackupSize += parseSizeBytes(backup.Size) + if strings.EqualFold(backup.State, "Completed") { + completedBackups++ + } + } + entry.CompletedBackups = completedBackups + entry.TotalBackupSizeBytes = float64(totalBackupSize) + latest, latestTime, ok := latestCompletedBackup(backups) + if !ok { + entry.Healthy = false + if len(backups) == 0 { + entry.HealthReason = "missing" + } else { + entry.HealthReason = "no_completed" + } + return + } + entry.LastBackupAt = latest.Created + entry.LastBackupSizeBytes = float64(parseSizeBytes(latest.Size)) + if latestTime.IsZero() { + entry.Healthy = false + entry.HealthReason = "unknown_timestamp" + return + } + entry.LastBackupAgeHours = roundHours(time.Since(latestTime).Hours()) + entry.Healthy = time.Since(latestTime) <= s.cfg.BackupMaxAge + if entry.Healthy { + entry.HealthReason = "fresh" + } else { + entry.HealthReason = "stale" + } + case "restic": + if err, hasErr := resticLookupErrors[entry.Namespace]; hasErr { + entry.Healthy = false + entry.HealthReason = "lookup_failed" + entry.Error = err.Error() + return + } + + key := entry.Namespace + "/" + entry.PVC + jobs := resticJobsByPVC[key] + if jobs == nil { + jobs = []k8s.BackupJobSummary{} + } + entry.BackupCount = len(jobs) + if len(jobs) > 0 { + entry.LastJobName = jobs[0].Name + entry.LastJobState = jobs[0].State + entry.LastJobProgressPct = backupJobProgressPct(jobs[0].State) + if !jobs[0].CreatedAt.IsZero() { + entry.LastJobStartedAt = jobs[0].CreatedAt.UTC().Format(time.RFC3339) + } + } + + completed := make([]k8s.BackupJobSummary, 0, len(jobs)) + active := 0 + for _, job := range jobs { + if backupJobInProgress(job.State) { + active++ + } + if strings.EqualFold(job.State, "Completed") { + completed = append(completed, job) + } + } + entry.ActiveBackups = active + entry.CompletedBackups = len(completed) + sizeSamples := completed + if len(sizeSamples) > 0 { + retained := sizeSamples[0].KeepLast + if retained > 0 && retained < len(sizeSamples) { + sizeSamples = sizeSamples[:retained] + } + } + totalStoredBytes := 0.0 + storedSamples := 0 + for index, job := range sizeSamples { + if index >= maxUsageSampleJobs { + break + } + storedBytes, ok := s.lookupResticStoredBytesForJob(ctx, entry.Namespace, job.Name) + if !ok { + continue + } + if index == 0 { + entry.LastBackupSizeBytes = storedBytes + } + totalStoredBytes += storedBytes + storedSamples++ + } + if storedSamples > 0 { + entry.TotalBackupSizeBytes = totalStoredBytes + } + if len(completed) == 0 { + entry.Healthy = false + switch { + case active > 0: + entry.HealthReason = "in_progress" + case len(jobs) == 0: + entry.HealthReason = "missing" + default: + entry.HealthReason = "no_completed" + } + return + } + + latestTime := backupJobTimestamp(completed[0]) + if latestTime.IsZero() { + entry.Healthy = false + entry.HealthReason = "unknown_timestamp" + return + } + entry.LastBackupAt = latestTime.UTC().Format(time.RFC3339) + entry.LastBackupAgeHours = roundHours(time.Since(latestTime).Hours()) + entry.Healthy = time.Since(latestTime) <= s.cfg.BackupMaxAge + if entry.Healthy { + entry.HealthReason = "fresh" + } else { + entry.HealthReason = "stale" + } + default: + entry.Healthy = false + entry.HealthReason = "unsupported_driver" + } +} + +func sortBackupJobsNewestFirst(items []k8s.BackupJobSummary) { + sort.Slice(items, func(i, j int) bool { + left := items[i].CompletionTime + if left.IsZero() { + left = items[i].CreatedAt + } + right := items[j].CompletionTime + if right.IsZero() { + right = items[j].CreatedAt + } + if left.Equal(right) { + return items[i].Name > items[j].Name + } + return left.After(right) + }) +} + +func (s *Server) refreshTelemetry(ctx context.Context) { + refreshCtx, cancel := context.WithTimeout(ctx, 2*time.Minute) + defer cancel() + + inventory, err := s.buildInventory(refreshCtx) + if err != nil { + log.Printf("inventory refresh failed: %v", err) + s.metrics.RecordInventoryFailure() + return + } + s.metrics.RecordInventory(inventory) +} diff --git a/internal/server/policy_http_handlers.go b/internal/server/policy_http_handlers.go new file mode 100644 index 0000000..0bdc131 --- /dev/null +++ b/internal/server/policy_http_handlers.go @@ -0,0 +1,60 @@ +package server + +import ( + "encoding/json" + "fmt" + "net/http" + "net/url" + "strings" + + "scm.bstein.dev/bstein/soteria/internal/api" +) + +func (s *Server) handlePolicies(w http.ResponseWriter, r *http.Request) { + switch r.Method { + case http.MethodGet: + writeJSON(w, http.StatusOK, api.BackupPolicyListResponse{Policies: s.listPolicies()}) + case http.MethodPost: + r.Body = http.MaxBytesReader(w, r.Body, 1<<20) + var req api.BackupPolicyUpsertRequest + if err := json.NewDecoder(r.Body).Decode(&req); err != nil { + writeError(w, http.StatusBadRequest, fmt.Sprintf("invalid JSON: %v", err)) + return + } + policy, err := s.upsertPolicy(r.Context(), req) + if err != nil { + writeError(w, http.StatusBadRequest, err.Error()) + return + } + writeJSON(w, http.StatusOK, policy) + default: + writeError(w, http.StatusMethodNotAllowed, "method not allowed") + } +} + +func (s *Server) handlePolicyByID(w http.ResponseWriter, r *http.Request) { + if r.Method != http.MethodDelete { + writeError(w, http.StatusMethodNotAllowed, "method not allowed") + return + } + encodedID := strings.TrimPrefix(r.URL.Path, "/v1/policies/") + if encodedID == "" { + writeError(w, http.StatusBadRequest, "policy id is required") + return + } + id, err := url.PathUnescape(encodedID) + if err != nil { + writeError(w, http.StatusBadRequest, "invalid policy id") + return + } + removed, err := s.deletePolicy(r.Context(), id) + if err != nil { + writeError(w, http.StatusBadGateway, err.Error()) + return + } + if !removed { + writeError(w, http.StatusNotFound, "policy not found") + return + } + writeJSON(w, http.StatusOK, map[string]string{"deleted": id}) +} diff --git a/internal/server/policy_runtime.go b/internal/server/policy_runtime.go new file mode 100644 index 0000000..76fcc18 --- /dev/null +++ b/internal/server/policy_runtime.go @@ -0,0 +1,379 @@ +package server + +import ( + "context" + "encoding/json" + "fmt" + "log" + "sort" + "strings" + "time" + + "scm.bstein.dev/bstein/soteria/internal/api" +) + +func (s *Server) runPolicyCycle(ctx context.Context) { + if !s.beginRun() { + return + } + defer s.endRun() + + policies := s.activePolicies() + if len(policies) == 0 { + return + } + + runCtx, cancel := context.WithTimeout(ctx, 3*time.Minute) + defer cancel() + + inventory, err := s.buildInventory(runCtx) + if err != nil { + log.Printf("policy cycle inventory failed: %v", err) + s.metrics.RecordPolicyBackup("inventory_error") + return + } + + pvcMap := make(map[string]api.PVCInventory) + namespaceMap := make(map[string][]api.PVCInventory) + for _, group := range inventory.Namespaces { + for _, pvc := range group.PVCs { + key := pvc.Namespace + "/" + pvc.PVC + pvcMap[key] = pvc + namespaceMap[pvc.Namespace] = append(namespaceMap[pvc.Namespace], pvc) + } + } + + type effectivePolicy struct { + IntervalHours float64 + Dedupe bool + KeepLast int + } + effectivePolicies := map[string]effectivePolicy{} + for _, policy := range policies { + matches := []api.PVCInventory{} + if policy.PVC != "" { + if pvc, ok := pvcMap[policy.Namespace+"/"+policy.PVC]; ok { + matches = append(matches, pvc) + } + } else { + matches = append(matches, namespaceMap[policy.Namespace]...) + } + + for _, pvc := range matches { + key := pvc.Namespace + "/" + pvc.PVC + current, exists := effectivePolicies[key] + if !exists || policy.IntervalHours < current.IntervalHours || (policy.IntervalHours == current.IntervalHours && keepLastStricter(policy.KeepLast, current.KeepLast)) { + effectivePolicies[key] = effectivePolicy{ + IntervalHours: policy.IntervalHours, + Dedupe: policy.Dedupe, + KeepLast: policy.KeepLast, + } + } + } + } + + for key, effective := range effectivePolicies { + pvc, ok := pvcMap[key] + if !ok { + continue + } + // Never enqueue a new policy backup while one is already active for this PVC. + // This prevents runaway job storms when a backup is stuck Pending/Running. + if pvc.ActiveBackups > 0 { + s.metrics.RecordPolicyBackup("in_progress") + continue + } + + lastRunRef := strings.TrimSpace(pvc.LastBackupAt) + if lastRunRef == "" { + // If no successful backup exists yet, fall back to the most recent job start + // so failed attempts are still throttled by interval_hours. + lastRunRef = strings.TrimSpace(pvc.LastJobStartedAt) + } + + if !backupDue(lastRunRef, effective.IntervalHours) { + s.metrics.RecordPolicyBackup("not_due") + continue + } + + _, result, err := s.executeBackup(runCtx, api.BackupRequest{ + Namespace: pvc.Namespace, + PVC: pvc.PVC, + DryRun: false, + Dedupe: boolPtr(effective.Dedupe), + KeepLast: intPtr(effective.KeepLast), + }, "policy-scheduler") + s.metrics.RecordBackupRequest(s.cfg.BackupDriver, result) + if err != nil { + s.metrics.RecordPolicyBackup(result) + log.Printf("policy backup failed for %s/%s: %v", pvc.Namespace, pvc.PVC, err) + continue + } + s.metrics.RecordPolicyBackup("success") + } +} + +func (s *Server) beginRun() bool { + s.runMu.Lock() + defer s.runMu.Unlock() + if s.running { + return false + } + s.running = true + return true +} + +func (s *Server) endRun() { + s.runMu.Lock() + defer s.runMu.Unlock() + s.running = false +} + +func (s *Server) loadPolicies(ctx context.Context) error { + raw, err := s.client.LoadSecretData(ctx, s.cfg.Namespace, s.cfg.PolicySecretName, policySecretKey) + if err != nil { + return err + } + if len(raw) == 0 { + return nil + } + + var doc struct { + Policies []struct { + ID string `json:"id"` + Namespace string `json:"namespace"` + PVC string `json:"pvc,omitempty"` + IntervalHours float64 `json:"interval_hours"` + Enabled bool `json:"enabled"` + Dedupe *bool `json:"dedupe,omitempty"` + KeepLast *int `json:"keep_last,omitempty"` + CreatedAt string `json:"created_at,omitempty"` + UpdatedAt string `json:"updated_at,omitempty"` + } `json:"policies"` + } + if err := json.Unmarshal(raw, &doc); err != nil { + return fmt.Errorf("decode policy document: %w", err) + } + + next := map[string]api.BackupPolicy{} + now := time.Now().UTC().Format(time.RFC3339) + for _, policy := range doc.Policies { + namespace := strings.TrimSpace(policy.Namespace) + pvc := strings.TrimSpace(policy.PVC) + if namespace == "" { + continue + } + interval := policy.IntervalHours + if interval <= 0 { + interval = defaultPolicyHours + } + dedupe := true + if policy.Dedupe != nil { + dedupe = *policy.Dedupe + } + keepLast := keepLastDefault(policy.KeepLast) + id := policyKey(namespace, pvc) + createdAt := policy.CreatedAt + if createdAt == "" { + createdAt = now + } + updatedAt := policy.UpdatedAt + if updatedAt == "" { + updatedAt = createdAt + } + next[id] = api.BackupPolicy{ + ID: id, + Namespace: namespace, + PVC: pvc, + IntervalHours: interval, + Enabled: policy.Enabled, + Dedupe: dedupe, + KeepLast: keepLast, + CreatedAt: createdAt, + UpdatedAt: updatedAt, + } + } + + s.policyMu.Lock() + s.policies = next + s.policyMu.Unlock() + return nil +} + +func (s *Server) persistPolicies(ctx context.Context, policies []api.BackupPolicy) error { + doc := struct { + Policies []api.BackupPolicy `json:"policies"` + }{ + Policies: policies, + } + payload, err := json.Marshal(doc) + if err != nil { + return fmt.Errorf("encode policy document: %w", err) + } + return s.client.SaveSecretData(ctx, s.cfg.Namespace, s.cfg.PolicySecretName, policySecretKey, payload, map[string]string{ + "app.kubernetes.io/name": "soteria", + "app.kubernetes.io/component": "policy-store", + }) +} + +func (s *Server) listPolicies() []api.BackupPolicy { + s.policyMu.RLock() + defer s.policyMu.RUnlock() + policies := make([]api.BackupPolicy, 0, len(s.policies)) + for _, policy := range s.policies { + policies = append(policies, policy) + } + sort.Slice(policies, func(i, j int) bool { + if policies[i].Namespace != policies[j].Namespace { + return policies[i].Namespace < policies[j].Namespace + } + if policies[i].PVC != policies[j].PVC { + return policies[i].PVC < policies[j].PVC + } + return policies[i].ID < policies[j].ID + }) + return policies +} + +func (s *Server) activePolicies() []api.BackupPolicy { + policies := s.listPolicies() + filtered := make([]api.BackupPolicy, 0, len(policies)) + for _, policy := range policies { + if policy.Enabled { + filtered = append(filtered, policy) + } + } + return filtered +} + +func (s *Server) upsertPolicy(ctx context.Context, req api.BackupPolicyUpsertRequest) (api.BackupPolicy, error) { + namespace := strings.TrimSpace(req.Namespace) + pvc := strings.TrimSpace(req.PVC) + if namespace == "" { + return api.BackupPolicy{}, fmt.Errorf("namespace is required") + } + if err := validateKubernetesName("namespace", namespace); err != nil { + return api.BackupPolicy{}, err + } + if pvc != "" { + if err := validateKubernetesName("pvc", pvc); err != nil { + return api.BackupPolicy{}, err + } + } + + interval := req.IntervalHours + if interval <= 0 { + interval = defaultPolicyHours + } + if interval > maxPolicyIntervalHrs { + return api.BackupPolicy{}, fmt.Errorf("interval_hours must be <= %d", maxPolicyIntervalHrs) + } + enabled := true + if req.Enabled != nil { + enabled = *req.Enabled + } + dedupe := dedupeDefault(req.Dedupe) + if err := validateKeepLast(req.KeepLast); err != nil { + return api.BackupPolicy{}, err + } + keepLast := keepLastDefault(req.KeepLast) + + id := policyKey(namespace, pvc) + now := time.Now().UTC().Format(time.RFC3339) + + s.policyMu.Lock() + before := clonePolicyMap(s.policies) + createdAt := now + if existing, ok := s.policies[id]; ok && existing.CreatedAt != "" { + createdAt = existing.CreatedAt + } + policy := api.BackupPolicy{ + ID: id, + Namespace: namespace, + PVC: pvc, + IntervalHours: interval, + Enabled: enabled, + Dedupe: dedupe, + KeepLast: keepLast, + CreatedAt: createdAt, + UpdatedAt: now, + } + s.policies[id] = policy + snapshot := policySliceFromMap(s.policies) + s.policyMu.Unlock() + + if err := s.persistPolicies(ctx, snapshot); err != nil { + s.policyMu.Lock() + s.policies = before + s.policyMu.Unlock() + return api.BackupPolicy{}, err + } + return policy, nil +} + +func (s *Server) deletePolicy(ctx context.Context, id string) (bool, error) { + id = strings.TrimSpace(id) + if id == "" { + return false, nil + } + + s.policyMu.Lock() + if _, ok := s.policies[id]; !ok { + s.policyMu.Unlock() + return false, nil + } + before := clonePolicyMap(s.policies) + delete(s.policies, id) + snapshot := policySliceFromMap(s.policies) + s.policyMu.Unlock() + + if err := s.persistPolicies(ctx, snapshot); err != nil { + s.policyMu.Lock() + s.policies = before + s.policyMu.Unlock() + return false, err + } + return true, nil +} + +func policyKey(namespace, pvc string) string { + scope := strings.TrimSpace(pvc) + if scope == "" { + scope = "_all" + } + return strings.TrimSpace(namespace) + "__" + scope +} + +func policySliceFromMap(source map[string]api.BackupPolicy) []api.BackupPolicy { + out := make([]api.BackupPolicy, 0, len(source)) + for _, policy := range source { + out = append(out, policy) + } + sort.Slice(out, func(i, j int) bool { + return out[i].ID < out[j].ID + }) + return out +} + +func clonePolicyMap(source map[string]api.BackupPolicy) map[string]api.BackupPolicy { + cloned := make(map[string]api.BackupPolicy, len(source)) + for key, value := range source { + cloned[key] = value + } + return cloned +} + +func backupDue(lastBackupAt string, intervalHours float64) bool { + if intervalHours <= 0 { + intervalHours = defaultPolicyHours + } + if strings.TrimSpace(lastBackupAt) == "" { + return true + } + timestamp, ok := parseBackupTime(lastBackupAt) + if !ok { + return true + } + interval := time.Duration(intervalHours * float64(time.Hour)) + return time.Since(timestamp) >= interval +} diff --git a/internal/server/restic_usage_store.go b/internal/server/restic_usage_store.go new file mode 100644 index 0000000..a7d6302 --- /dev/null +++ b/internal/server/restic_usage_store.go @@ -0,0 +1,259 @@ +package server + +import ( + "context" + "encoding/json" + "fmt" + "log" + "math" + "regexp" + "sort" + "strconv" + "strings" + "time" +) + +var ( + resticAddedStoredPattern = regexp.MustCompile(`(?mi)added to the (?:repository|repo):[^\n]*\(([^)]+)\s+stored\)`) + resticDataAddedPattern = regexp.MustCompile(`(?m)"data_added":\s*([0-9]+)`) +) + +func (s *Server) lookupResticStoredBytesForJob(ctx context.Context, namespace, jobName string) (float64, bool) { + key := namespace + "/" + jobName + + s.jobUsageMu.RLock() + cached, ok := s.jobUsage[key] + s.jobUsageMu.RUnlock() + if ok && time.Since(cached.CheckedAt) < 15*time.Minute { + return cached.Bytes, cached.Known + } + + if bytes, known := s.lookupPersistedResticUsage(key); known { + entry := resticJobUsageCacheEntry{ + Known: true, + Bytes: bytes, + CheckedAt: time.Now().UTC(), + } + s.jobUsageMu.Lock() + if s.jobUsage == nil { + s.jobUsage = map[string]resticJobUsageCacheEntry{} + } + s.jobUsage[key] = entry + s.jobUsageMu.Unlock() + return bytes, true + } + + logBody, err := s.client.ReadBackupJobLog(ctx, namespace, jobName) + entry := resticJobUsageCacheEntry{ + Known: false, + Bytes: 0, + CheckedAt: time.Now().UTC(), + } + if err == nil { + if parsedBytes, parsed := parseResticStoredBytes(logBody); parsed { + entry.Known = true + entry.Bytes = parsedBytes + s.storePersistedResticUsage(ctx, key, parsedBytes) + } + } + + s.jobUsageMu.Lock() + if s.jobUsage == nil { + s.jobUsage = map[string]resticJobUsageCacheEntry{} + } + s.jobUsage[key] = entry + s.jobUsageMu.Unlock() + + return entry.Bytes, entry.Known +} + +func parseResticStoredBytes(logBody string) (float64, bool) { + if logBody == "" { + return 0, false + } + matches := resticDataAddedPattern.FindAllStringSubmatch(logBody, -1) + if len(matches) > 0 { + last := matches[len(matches)-1] + if len(last) > 1 { + if value, err := strconv.ParseFloat(strings.TrimSpace(last[1]), 64); err == nil { + return value, true + } + } + } + + textMatches := resticAddedStoredPattern.FindAllStringSubmatch(logBody, -1) + if len(textMatches) == 0 { + return 0, false + } + last := textMatches[len(textMatches)-1] + if len(last) < 2 { + return 0, false + } + return parseHumanByteSize(last[1]) +} + +func parseHumanByteSize(raw string) (float64, bool) { + parts := strings.Fields(strings.TrimSpace(raw)) + if len(parts) < 2 { + return 0, false + } + value, err := strconv.ParseFloat(strings.ReplaceAll(parts[0], ",", ""), 64) + if err != nil { + return 0, false + } + unit := strings.ToUpper(strings.TrimSpace(parts[1])) + switch unit { + case "B": + return value, true + case "KIB": + return value * 1024, true + case "MIB": + return value * 1024 * 1024, true + case "GIB": + return value * 1024 * 1024 * 1024, true + case "TIB": + return value * 1024 * 1024 * 1024 * 1024, true + case "KB": + return value * 1000, true + case "MB": + return value * 1000 * 1000, true + case "GB": + return value * 1000 * 1000 * 1000, true + case "TB": + return value * 1000 * 1000 * 1000 * 1000, true + default: + return 0, false + } +} + +func (s *Server) loadResticUsage(ctx context.Context) error { + if strings.TrimSpace(s.cfg.UsageSecretName) == "" { + return nil + } + raw, err := s.client.LoadSecretData(ctx, s.cfg.Namespace, s.cfg.UsageSecretName, usageSecretKey) + if err != nil { + return err + } + if len(raw) == 0 { + return nil + } + + var doc resticPersistedUsageDocument + if err := json.Unmarshal(raw, &doc); err != nil { + return fmt.Errorf("decode restic usage document: %w", err) + } + next := map[string]resticPersistedUsageEntry{} + for _, item := range doc.Jobs { + key := strings.TrimSpace(item.Key) + if key == "" || item.Bytes < 0 || math.IsNaN(item.Bytes) || math.IsInf(item.Bytes, 0) { + continue + } + next[key] = resticPersistedUsageEntry{ + Bytes: item.Bytes, + UpdatedAt: strings.TrimSpace(item.UpdatedAt), + } + } + + s.usageMu.Lock() + s.usageStore = next + s.usageMu.Unlock() + return nil +} + +func (s *Server) lookupPersistedResticUsage(key string) (float64, bool) { + s.usageMu.RLock() + defer s.usageMu.RUnlock() + if s.usageStore == nil { + return 0, false + } + entry, ok := s.usageStore[key] + if !ok { + return 0, false + } + if entry.Bytes < 0 || math.IsNaN(entry.Bytes) || math.IsInf(entry.Bytes, 0) { + return 0, false + } + return entry.Bytes, true +} + +func (s *Server) storePersistedResticUsage(ctx context.Context, key string, value float64) { + if key == "" || value < 0 || math.IsNaN(value) || math.IsInf(value, 0) { + return + } + now := time.Now().UTC().Format(time.RFC3339) + changed := false + + s.usageMu.Lock() + if s.usageStore == nil { + s.usageStore = map[string]resticPersistedUsageEntry{} + } + current, exists := s.usageStore[key] + if !exists || current.Bytes != value || strings.TrimSpace(current.UpdatedAt) == "" { + s.usageStore[key] = resticPersistedUsageEntry{ + Bytes: value, + UpdatedAt: now, + } + changed = true + } + s.usageMu.Unlock() + + if !changed { + return + } + if err := s.persistResticUsage(ctx); err != nil { + log.Printf("persist restic usage failed: %v", err) + } +} + +func (s *Server) persistResticUsage(ctx context.Context) error { + if strings.TrimSpace(s.cfg.UsageSecretName) == "" { + return nil + } + s.usageMu.RLock() + entries := make([]struct { + Key string + Value resticPersistedUsageEntry + }, 0, len(s.usageStore)) + for key, value := range s.usageStore { + entries = append(entries, struct { + Key string + Value resticPersistedUsageEntry + }{Key: key, Value: value}) + } + s.usageMu.RUnlock() + + sort.Slice(entries, func(i, j int) bool { + return entries[i].Key < entries[j].Key + }) + + doc := resticPersistedUsageDocument{ + Jobs: make([]struct { + Key string `json:"key"` + Bytes float64 `json:"bytes"` + UpdatedAt string `json:"updated_at,omitempty"` + }, 0, len(entries)), + } + for _, entry := range entries { + if entry.Key == "" || entry.Value.Bytes < 0 || math.IsNaN(entry.Value.Bytes) || math.IsInf(entry.Value.Bytes, 0) { + continue + } + doc.Jobs = append(doc.Jobs, struct { + Key string `json:"key"` + Bytes float64 `json:"bytes"` + UpdatedAt string `json:"updated_at,omitempty"` + }{ + Key: entry.Key, + Bytes: entry.Value.Bytes, + UpdatedAt: strings.TrimSpace(entry.Value.UpdatedAt), + }) + } + + payload, err := json.Marshal(doc) + if err != nil { + return fmt.Errorf("encode restic usage document: %w", err) + } + return s.client.SaveSecretData(ctx, s.cfg.Namespace, s.cfg.UsageSecretName, usageSecretKey, payload, map[string]string{ + "app.kubernetes.io/name": "soteria", + "app.kubernetes.io/component": "usage-store", + }) +} diff --git a/internal/server/restore_handlers.go b/internal/server/restore_handlers.go new file mode 100644 index 0000000..d95814e --- /dev/null +++ b/internal/server/restore_handlers.go @@ -0,0 +1,316 @@ +package server + +import ( + "context" + "encoding/json" + "fmt" + "log" + "net/http" + "strings" + + "scm.bstein.dev/bstein/soteria/internal/api" +) + +func (s *Server) handleRestore(w http.ResponseWriter, r *http.Request) { + if r.Method != http.MethodPost { + writeError(w, http.StatusMethodNotAllowed, "method not allowed") + return + } + + r.Body = http.MaxBytesReader(w, r.Body, 1<<20) + var req api.RestoreTestRequest + if err := json.NewDecoder(r.Body).Decode(&req); err != nil { + s.metrics.RecordRestoreRequest(s.cfg.BackupDriver, "invalid_json") + writeError(w, http.StatusBadRequest, fmt.Sprintf("invalid JSON: %v", err)) + return + } + req.Namespace = strings.TrimSpace(req.Namespace) + req.PVC = strings.TrimSpace(req.PVC) + req.TargetPVC = strings.TrimSpace(req.TargetPVC) + req.TargetNamespace = strings.TrimSpace(req.TargetNamespace) + + if req.Namespace == "" { + s.metrics.RecordRestoreRequest(s.cfg.BackupDriver, "validation_error") + writeError(w, http.StatusBadRequest, "namespace is required") + return + } + if req.PVC == "" { + s.metrics.RecordRestoreRequest(s.cfg.BackupDriver, "validation_error") + writeError(w, http.StatusBadRequest, "pvc is required") + return + } + if req.TargetPVC == "" { + s.metrics.RecordRestoreRequest(s.cfg.BackupDriver, "validation_error") + writeError(w, http.StatusBadRequest, "target_pvc is required") + return + } + if req.TargetNamespace == "" { + req.TargetNamespace = req.Namespace + } + if err := validateKubernetesName("namespace", req.Namespace); err != nil { + s.metrics.RecordRestoreRequest(s.cfg.BackupDriver, "validation_error") + writeError(w, http.StatusBadRequest, err.Error()) + return + } + if err := validateKubernetesName("pvc", req.PVC); err != nil { + s.metrics.RecordRestoreRequest(s.cfg.BackupDriver, "validation_error") + writeError(w, http.StatusBadRequest, err.Error()) + return + } + if err := validateKubernetesName("target_namespace", req.TargetNamespace); err != nil { + s.metrics.RecordRestoreRequest(s.cfg.BackupDriver, "validation_error") + writeError(w, http.StatusBadRequest, err.Error()) + return + } + if err := validateKubernetesName("target_pvc", req.TargetPVC); err != nil { + s.metrics.RecordRestoreRequest(s.cfg.BackupDriver, "validation_error") + writeError(w, http.StatusBadRequest, err.Error()) + return + } + if req.Namespace == req.TargetNamespace && req.PVC == req.TargetPVC { + s.metrics.RecordRestoreRequest(s.cfg.BackupDriver, "conflict") + writeError(w, http.StatusConflict, "target namespace/pvc must differ from source") + return + } + + requester := currentRequester(r.Context()) + response, result, err := s.executeRestore(r.Context(), req, requester) + s.metrics.RecordRestoreRequest(s.cfg.BackupDriver, result) + if err != nil { + writeError(w, restoreStatusCode(result), err.Error()) + return + } + writeJSON(w, http.StatusOK, response) +} + +func (s *Server) handleNamespaceRestore(w http.ResponseWriter, r *http.Request) { + if r.Method != http.MethodPost { + writeError(w, http.StatusMethodNotAllowed, "method not allowed") + return + } + + r.Body = http.MaxBytesReader(w, r.Body, 1<<20) + var req api.NamespaceRestoreRequest + if err := json.NewDecoder(r.Body).Decode(&req); err != nil { + s.metrics.RecordNamespaceRestoreRequest(s.cfg.BackupDriver, "invalid_json") + writeError(w, http.StatusBadRequest, fmt.Sprintf("invalid JSON: %v", err)) + return + } + + req.Namespace = strings.TrimSpace(req.Namespace) + req.TargetNamespace = strings.TrimSpace(req.TargetNamespace) + req.TargetPrefix = strings.TrimSpace(req.TargetPrefix) + req.Snapshot = strings.TrimSpace(req.Snapshot) + + if req.Namespace == "" { + s.metrics.RecordNamespaceRestoreRequest(s.cfg.BackupDriver, "validation_error") + writeError(w, http.StatusBadRequest, "namespace is required") + return + } + if req.TargetNamespace == "" { + req.TargetNamespace = req.Namespace + } + if err := validateKubernetesName("namespace", req.Namespace); err != nil { + s.metrics.RecordNamespaceRestoreRequest(s.cfg.BackupDriver, "validation_error") + writeError(w, http.StatusBadRequest, err.Error()) + return + } + if err := validateKubernetesName("target_namespace", req.TargetNamespace); err != nil { + s.metrics.RecordNamespaceRestoreRequest(s.cfg.BackupDriver, "validation_error") + writeError(w, http.StatusBadRequest, err.Error()) + return + } + + pvcs, err := s.listNamespaceBoundPVCs(r.Context(), req.Namespace) + if err != nil { + s.metrics.RecordNamespaceRestoreRequest(s.cfg.BackupDriver, "backend_error") + writeError(w, http.StatusBadGateway, err.Error()) + return + } + + requester := currentRequester(r.Context()) + response := api.NamespaceRestoreResponse{ + Namespace: req.Namespace, + TargetNamespace: req.TargetNamespace, + RequestedBy: requester, + Driver: s.cfg.BackupDriver, + DryRun: req.DryRun, + Results: make([]api.NamespaceRestoreResult, 0, len(pvcs)), + } + + for _, pvc := range pvcs { + targetPVC := targetPVCName(req.TargetPrefix, pvc.Name) + restoreReq := api.RestoreTestRequest{ + Namespace: req.Namespace, + PVC: pvc.Name, + Snapshot: req.Snapshot, + TargetNamespace: req.TargetNamespace, + TargetPVC: targetPVC, + DryRun: req.DryRun, + } + result, status, execErr := s.executeRestore(r.Context(), restoreReq, requester) + s.metrics.RecordRestoreRequest(s.cfg.BackupDriver, status) + + item := api.NamespaceRestoreResult{ + Namespace: req.Namespace, + PVC: pvc.Name, + TargetNamespace: req.TargetNamespace, + TargetPVC: targetPVC, + Status: status, + Volume: result.Volume, + BackupURL: result.BackupURL, + } + if execErr != nil { + item.Error = execErr.Error() + response.Failed++ + } else { + response.Succeeded++ + } + response.Results = append(response.Results, item) + } + + response.Total = len(response.Results) + s.metrics.RecordNamespaceRestoreRequest(s.cfg.BackupDriver, namespaceResultStatus(req.DryRun, response.Total, response.Succeeded, response.Failed)) + writeJSON(w, http.StatusOK, response) +} + +func (s *Server) executeRestore(ctx context.Context, req api.RestoreTestRequest, requester string) (api.RestoreTestResponse, string, error) { + req.Namespace = strings.TrimSpace(req.Namespace) + req.PVC = strings.TrimSpace(req.PVC) + req.TargetNamespace = strings.TrimSpace(req.TargetNamespace) + req.TargetPVC = strings.TrimSpace(req.TargetPVC) + req.BackupURL = strings.TrimSpace(req.BackupURL) + req.Snapshot = strings.TrimSpace(req.Snapshot) + + if req.TargetNamespace == "" { + req.TargetNamespace = req.Namespace + } + + switch s.cfg.BackupDriver { + case "longhorn": + exists, err := s.client.PersistentVolumeClaimExists(ctx, req.TargetNamespace, req.TargetPVC) + if err != nil { + return api.RestoreTestResponse{}, "validation_error", err + } + if exists { + return api.RestoreTestResponse{}, "conflict", fmt.Errorf("target pvc %s/%s already exists", req.TargetNamespace, req.TargetPVC) + } + + volumeName, _, _, err := s.client.ResolvePVCVolume(ctx, req.Namespace, req.PVC) + if err != nil { + return api.RestoreTestResponse{}, "validation_error", err + } + + backupURL := req.BackupURL + if backupURL == "" { + backup, err := s.longhorn.FindBackup(ctx, volumeName, req.Snapshot) + if err != nil { + return api.RestoreTestResponse{}, "validation_error", err + } + backupURL = strings.TrimSpace(backup.URL) + } + if backupURL == "" { + return api.RestoreTestResponse{}, "validation_error", fmt.Errorf("backup_url is required") + } + + restoreVolumeName := backupName("restore", req.TargetNamespace+"-"+req.TargetPVC) + response := api.RestoreTestResponse{ + Driver: "longhorn", + Volume: restoreVolumeName, + TargetNamespace: req.TargetNamespace, + TargetPVC: req.TargetPVC, + BackupURL: backupURL, + Namespace: req.Namespace, + RequestedBy: requester, + DryRun: req.DryRun, + } + if req.DryRun { + return response, "dry_run", nil + } + + sourceVolume, err := s.longhorn.GetVolume(ctx, volumeName) + if err != nil { + return api.RestoreTestResponse{}, "backend_error", err + } + replicas := sourceVolume.NumberOfReplicas + if replicas == 0 { + replicas = 2 + } + + if _, err := s.longhorn.CreateVolumeFromBackup(ctx, restoreVolumeName, sourceVolume.Size, replicas, backupURL); err != nil { + return api.RestoreTestResponse{}, "backend_error", err + } + if err := s.longhorn.CreatePVC(ctx, restoreVolumeName, req.TargetNamespace, req.TargetPVC); err != nil { + cleanupErr := s.longhorn.DeleteVolume(ctx, restoreVolumeName) + if cleanupErr != nil { + log.Printf("restore cleanup failed for %s: %v", restoreVolumeName, cleanupErr) + return api.RestoreTestResponse{}, "backend_error", fmt.Errorf("create restore pvc: %v (cleanup failed: %v)", err, cleanupErr) + } + return api.RestoreTestResponse{}, "backend_error", fmt.Errorf("create restore pvc: %v", err) + } + return response, "success", nil + case "restic": + if repo, snapshot, ok := decodeResticSelector(req.BackupURL); ok { + if strings.TrimSpace(req.Snapshot) == "" { + req.Snapshot = snapshot + } + if strings.TrimSpace(req.Repository) == "" { + req.Repository = repo + } + } + jobName, secretName, err := s.client.CreateRestoreJob(ctx, s.cfg, req) + if err != nil { + return api.RestoreTestResponse{}, "backend_error", err + } + result := "success" + if req.DryRun { + result = "dry_run" + } + return api.RestoreTestResponse{ + Driver: "restic", + JobName: jobName, + Namespace: req.Namespace, + TargetNamespace: req.TargetNamespace, + TargetPVC: req.TargetPVC, + Secret: secretName, + RequestedBy: requester, + DryRun: req.DryRun, + }, result, nil + default: + return api.RestoreTestResponse{}, "unsupported_driver", fmt.Errorf("unsupported backup driver") + } +} + +func restoreStatusCode(result string) int { + switch result { + case "validation_error", "unsupported_driver": + return http.StatusBadRequest + case "conflict": + return http.StatusConflict + case "backend_error": + return http.StatusBadGateway + default: + return http.StatusInternalServerError + } +} + +func targetPVCName(prefix, sourcePVC string) string { + prefix = sanitizeName(prefix) + if prefix == "" { + prefix = "restore" + } + if !strings.HasSuffix(prefix, "-") { + prefix += "-" + } + name := sanitizeName(prefix + sourcePVC) + if name == "" { + name = "restore" + } + if len(name) > 63 { + name = strings.Trim(name[:63], "-") + } + if name == "" { + name = "restore" + } + return name +} diff --git a/internal/server/server.go b/internal/server/server.go index c50f8e2..b1899d4 100644 --- a/internal/server/server.go +++ b/internal/server/server.go @@ -2,16 +2,8 @@ package server import ( "context" - "encoding/base64" - "encoding/json" - "fmt" "log" - "math" "net/http" - "net/url" - "regexp" - "sort" - "strconv" "strings" "sync" "time" @@ -22,8 +14,6 @@ import ( "scm.bstein.dev/bstein/soteria/internal/longhorn" corev1 "k8s.io/api/core/v1" - "k8s.io/apimachinery/pkg/api/resource" - k8svalidation "k8s.io/apimachinery/pkg/util/validation" ) type kubeClient interface { @@ -109,11 +99,6 @@ type resticPersistedUsageDocument struct { } `json:"jobs"` } -var ( - resticAddedStoredPattern = regexp.MustCompile(`(?mi)added to the (?:repository|repo):[^\n]*\(([^)]+)\s+stored\)`) - resticDataAddedPattern = regexp.MustCompile(`(?m)"data_added":\s*([0-9]+)`) -) - func New(cfg *config.Config, client *k8s.Client, lh *longhorn.Client) *Server { s := &Server{ cfg: cfg, @@ -271,1933 +256,3 @@ func (s *Server) handleWhoAmI(w http.ResponseWriter, r *http.Request) { AllowedGroups: s.cfg.AllowedGroups, }) } - -func (s *Server) handleInventory(w http.ResponseWriter, r *http.Request) { - if r.Method != http.MethodGet { - writeError(w, http.StatusMethodNotAllowed, "method not allowed") - return - } - inventory, err := s.buildInventory(r.Context()) - if err != nil { - writeError(w, http.StatusBadGateway, err.Error()) - return - } - writeJSON(w, http.StatusOK, inventory) -} - -func (s *Server) handleBackups(w http.ResponseWriter, r *http.Request) { - if r.Method != http.MethodGet { - writeError(w, http.StatusMethodNotAllowed, "method not allowed") - return - } - - namespace := strings.TrimSpace(r.URL.Query().Get("namespace")) - pvcName := strings.TrimSpace(r.URL.Query().Get("pvc")) - if namespace == "" || pvcName == "" { - writeError(w, http.StatusBadRequest, "namespace and pvc are required") - return - } - - volumeName, _, _, err := s.client.ResolvePVCVolume(r.Context(), namespace, pvcName) - if err != nil { - writeError(w, http.StatusBadRequest, err.Error()) - return - } - - switch s.cfg.BackupDriver { - case "longhorn": - backups, err := s.longhorn.ListBackups(r.Context(), volumeName) - if err != nil { - writeError(w, http.StatusBadGateway, err.Error()) - return - } - writeJSON(w, http.StatusOK, api.BackupListResponse{ - Namespace: namespace, - PVC: pvcName, - Volume: volumeName, - Backups: buildBackupRecords(backups), - }) - case "restic": - jobs, err := s.client.ListBackupJobsForPVC(r.Context(), namespace, pvcName) - if err != nil { - writeError(w, http.StatusBadGateway, err.Error()) - return - } - records := s.buildResticBackupRecords(r.Context(), namespace, jobs, s.cfg.ResticRepository) - writeJSON(w, http.StatusOK, api.BackupListResponse{ - Namespace: namespace, - PVC: pvcName, - Volume: volumeName, - Backups: records, - }) - default: - writeError(w, http.StatusBadRequest, "unsupported backup driver") - } -} - -func (s *Server) handleBackup(w http.ResponseWriter, r *http.Request) { - if r.Method != http.MethodPost { - writeError(w, http.StatusMethodNotAllowed, "method not allowed") - return - } - - r.Body = http.MaxBytesReader(w, r.Body, 1<<20) - var req api.BackupRequest - if err := json.NewDecoder(r.Body).Decode(&req); err != nil { - s.metrics.RecordBackupRequest(s.cfg.BackupDriver, "invalid_json") - writeError(w, http.StatusBadRequest, fmt.Sprintf("invalid JSON: %v", err)) - return - } - if strings.TrimSpace(req.Namespace) == "" || strings.TrimSpace(req.PVC) == "" { - s.metrics.RecordBackupRequest(s.cfg.BackupDriver, "validation_error") - writeError(w, http.StatusBadRequest, "namespace and pvc are required") - return - } - if err := validateKeepLast(req.KeepLast); err != nil { - s.metrics.RecordBackupRequest(s.cfg.BackupDriver, "validation_error") - writeError(w, http.StatusBadRequest, err.Error()) - return - } - req.Namespace = strings.TrimSpace(req.Namespace) - req.PVC = strings.TrimSpace(req.PVC) - requester := currentRequester(r.Context()) - - response, result, err := s.executeBackup(r.Context(), req, requester) - s.metrics.RecordBackupRequest(s.cfg.BackupDriver, result) - if err != nil { - writeError(w, backupStatusCode(result), err.Error()) - return - } - writeJSON(w, http.StatusOK, response) -} - -func (s *Server) handleRestore(w http.ResponseWriter, r *http.Request) { - if r.Method != http.MethodPost { - writeError(w, http.StatusMethodNotAllowed, "method not allowed") - return - } - - r.Body = http.MaxBytesReader(w, r.Body, 1<<20) - var req api.RestoreTestRequest - if err := json.NewDecoder(r.Body).Decode(&req); err != nil { - s.metrics.RecordRestoreRequest(s.cfg.BackupDriver, "invalid_json") - writeError(w, http.StatusBadRequest, fmt.Sprintf("invalid JSON: %v", err)) - return - } - req.Namespace = strings.TrimSpace(req.Namespace) - req.PVC = strings.TrimSpace(req.PVC) - req.TargetPVC = strings.TrimSpace(req.TargetPVC) - req.TargetNamespace = strings.TrimSpace(req.TargetNamespace) - - if req.Namespace == "" { - s.metrics.RecordRestoreRequest(s.cfg.BackupDriver, "validation_error") - writeError(w, http.StatusBadRequest, "namespace is required") - return - } - if req.PVC == "" { - s.metrics.RecordRestoreRequest(s.cfg.BackupDriver, "validation_error") - writeError(w, http.StatusBadRequest, "pvc is required") - return - } - if req.TargetPVC == "" { - s.metrics.RecordRestoreRequest(s.cfg.BackupDriver, "validation_error") - writeError(w, http.StatusBadRequest, "target_pvc is required") - return - } - if req.TargetNamespace == "" { - req.TargetNamespace = req.Namespace - } - if err := validateKubernetesName("namespace", req.Namespace); err != nil { - s.metrics.RecordRestoreRequest(s.cfg.BackupDriver, "validation_error") - writeError(w, http.StatusBadRequest, err.Error()) - return - } - if err := validateKubernetesName("pvc", req.PVC); err != nil { - s.metrics.RecordRestoreRequest(s.cfg.BackupDriver, "validation_error") - writeError(w, http.StatusBadRequest, err.Error()) - return - } - if err := validateKubernetesName("target_namespace", req.TargetNamespace); err != nil { - s.metrics.RecordRestoreRequest(s.cfg.BackupDriver, "validation_error") - writeError(w, http.StatusBadRequest, err.Error()) - return - } - if err := validateKubernetesName("target_pvc", req.TargetPVC); err != nil { - s.metrics.RecordRestoreRequest(s.cfg.BackupDriver, "validation_error") - writeError(w, http.StatusBadRequest, err.Error()) - return - } - if req.Namespace == req.TargetNamespace && req.PVC == req.TargetPVC { - s.metrics.RecordRestoreRequest(s.cfg.BackupDriver, "conflict") - writeError(w, http.StatusConflict, "target namespace/pvc must differ from source") - return - } - - requester := currentRequester(r.Context()) - response, result, err := s.executeRestore(r.Context(), req, requester) - s.metrics.RecordRestoreRequest(s.cfg.BackupDriver, result) - if err != nil { - writeError(w, restoreStatusCode(result), err.Error()) - return - } - writeJSON(w, http.StatusOK, response) -} - -func (s *Server) handleNamespaceBackup(w http.ResponseWriter, r *http.Request) { - if r.Method != http.MethodPost { - writeError(w, http.StatusMethodNotAllowed, "method not allowed") - return - } - - r.Body = http.MaxBytesReader(w, r.Body, 1<<20) - var req api.NamespaceBackupRequest - if err := json.NewDecoder(r.Body).Decode(&req); err != nil { - s.metrics.RecordNamespaceBackupRequest(s.cfg.BackupDriver, "invalid_json") - writeError(w, http.StatusBadRequest, fmt.Sprintf("invalid JSON: %v", err)) - return - } - req.Namespace = strings.TrimSpace(req.Namespace) - if req.Namespace == "" { - s.metrics.RecordNamespaceBackupRequest(s.cfg.BackupDriver, "validation_error") - writeError(w, http.StatusBadRequest, "namespace is required") - return - } - if err := validateKeepLast(req.KeepLast); err != nil { - s.metrics.RecordNamespaceBackupRequest(s.cfg.BackupDriver, "validation_error") - writeError(w, http.StatusBadRequest, err.Error()) - return - } - if err := validateKubernetesName("namespace", req.Namespace); err != nil { - s.metrics.RecordNamespaceBackupRequest(s.cfg.BackupDriver, "validation_error") - writeError(w, http.StatusBadRequest, err.Error()) - return - } - - pvcs, err := s.listNamespaceBoundPVCs(r.Context(), req.Namespace) - if err != nil { - s.metrics.RecordNamespaceBackupRequest(s.cfg.BackupDriver, "backend_error") - writeError(w, http.StatusBadGateway, err.Error()) - return - } - - requester := currentRequester(r.Context()) - resolvedDedupe := dedupeDefault(req.Dedupe) - resolvedKeepLast := keepLastDefault(req.KeepLast) - response := api.NamespaceBackupResponse{ - Namespace: req.Namespace, - RequestedBy: requester, - Driver: s.cfg.BackupDriver, - DryRun: req.DryRun, - Dedupe: resolvedDedupe, - KeepLast: resolvedKeepLast, - Results: make([]api.NamespaceBackupResult, 0, len(pvcs)), - } - - for _, pvc := range pvcs { - backupReq := api.BackupRequest{ - Namespace: req.Namespace, - PVC: pvc.Name, - DryRun: req.DryRun, - Dedupe: boolPtr(resolvedDedupe), - KeepLast: intPtr(resolvedKeepLast), - } - result, status, execErr := s.executeBackup(r.Context(), backupReq, requester) - s.metrics.RecordBackupRequest(s.cfg.BackupDriver, status) - - item := api.NamespaceBackupResult{ - Namespace: req.Namespace, - PVC: pvc.Name, - Status: status, - Volume: result.Volume, - Backup: result.Backup, - } - if execErr != nil { - item.Error = execErr.Error() - response.Failed++ - } else { - response.Succeeded++ - } - response.Results = append(response.Results, item) - } - - response.Total = len(response.Results) - s.metrics.RecordNamespaceBackupRequest(s.cfg.BackupDriver, namespaceResultStatus(req.DryRun, response.Total, response.Succeeded, response.Failed)) - writeJSON(w, http.StatusOK, response) -} - -func (s *Server) handleNamespaceRestore(w http.ResponseWriter, r *http.Request) { - if r.Method != http.MethodPost { - writeError(w, http.StatusMethodNotAllowed, "method not allowed") - return - } - - r.Body = http.MaxBytesReader(w, r.Body, 1<<20) - var req api.NamespaceRestoreRequest - if err := json.NewDecoder(r.Body).Decode(&req); err != nil { - s.metrics.RecordNamespaceRestoreRequest(s.cfg.BackupDriver, "invalid_json") - writeError(w, http.StatusBadRequest, fmt.Sprintf("invalid JSON: %v", err)) - return - } - - req.Namespace = strings.TrimSpace(req.Namespace) - req.TargetNamespace = strings.TrimSpace(req.TargetNamespace) - req.TargetPrefix = strings.TrimSpace(req.TargetPrefix) - req.Snapshot = strings.TrimSpace(req.Snapshot) - - if req.Namespace == "" { - s.metrics.RecordNamespaceRestoreRequest(s.cfg.BackupDriver, "validation_error") - writeError(w, http.StatusBadRequest, "namespace is required") - return - } - if req.TargetNamespace == "" { - req.TargetNamespace = req.Namespace - } - if err := validateKubernetesName("namespace", req.Namespace); err != nil { - s.metrics.RecordNamespaceRestoreRequest(s.cfg.BackupDriver, "validation_error") - writeError(w, http.StatusBadRequest, err.Error()) - return - } - if err := validateKubernetesName("target_namespace", req.TargetNamespace); err != nil { - s.metrics.RecordNamespaceRestoreRequest(s.cfg.BackupDriver, "validation_error") - writeError(w, http.StatusBadRequest, err.Error()) - return - } - - pvcs, err := s.listNamespaceBoundPVCs(r.Context(), req.Namespace) - if err != nil { - s.metrics.RecordNamespaceRestoreRequest(s.cfg.BackupDriver, "backend_error") - writeError(w, http.StatusBadGateway, err.Error()) - return - } - - requester := currentRequester(r.Context()) - response := api.NamespaceRestoreResponse{ - Namespace: req.Namespace, - TargetNamespace: req.TargetNamespace, - RequestedBy: requester, - Driver: s.cfg.BackupDriver, - DryRun: req.DryRun, - Results: make([]api.NamespaceRestoreResult, 0, len(pvcs)), - } - - for _, pvc := range pvcs { - targetPVC := targetPVCName(req.TargetPrefix, pvc.Name) - restoreReq := api.RestoreTestRequest{ - Namespace: req.Namespace, - PVC: pvc.Name, - Snapshot: req.Snapshot, - TargetNamespace: req.TargetNamespace, - TargetPVC: targetPVC, - DryRun: req.DryRun, - } - result, status, execErr := s.executeRestore(r.Context(), restoreReq, requester) - s.metrics.RecordRestoreRequest(s.cfg.BackupDriver, status) - - item := api.NamespaceRestoreResult{ - Namespace: req.Namespace, - PVC: pvc.Name, - TargetNamespace: req.TargetNamespace, - TargetPVC: targetPVC, - Status: status, - Volume: result.Volume, - BackupURL: result.BackupURL, - } - if execErr != nil { - item.Error = execErr.Error() - response.Failed++ - } else { - response.Succeeded++ - } - response.Results = append(response.Results, item) - } - - response.Total = len(response.Results) - s.metrics.RecordNamespaceRestoreRequest(s.cfg.BackupDriver, namespaceResultStatus(req.DryRun, response.Total, response.Succeeded, response.Failed)) - writeJSON(w, http.StatusOK, response) -} - -func (s *Server) handlePolicies(w http.ResponseWriter, r *http.Request) { - switch r.Method { - case http.MethodGet: - writeJSON(w, http.StatusOK, api.BackupPolicyListResponse{Policies: s.listPolicies()}) - case http.MethodPost: - r.Body = http.MaxBytesReader(w, r.Body, 1<<20) - var req api.BackupPolicyUpsertRequest - if err := json.NewDecoder(r.Body).Decode(&req); err != nil { - writeError(w, http.StatusBadRequest, fmt.Sprintf("invalid JSON: %v", err)) - return - } - policy, err := s.upsertPolicy(r.Context(), req) - if err != nil { - writeError(w, http.StatusBadRequest, err.Error()) - return - } - writeJSON(w, http.StatusOK, policy) - default: - writeError(w, http.StatusMethodNotAllowed, "method not allowed") - } -} - -func (s *Server) handlePolicyByID(w http.ResponseWriter, r *http.Request) { - if r.Method != http.MethodDelete { - writeError(w, http.StatusMethodNotAllowed, "method not allowed") - return - } - encodedID := strings.TrimPrefix(r.URL.Path, "/v1/policies/") - if encodedID == "" { - writeError(w, http.StatusBadRequest, "policy id is required") - return - } - id, err := url.PathUnescape(encodedID) - if err != nil { - writeError(w, http.StatusBadRequest, "invalid policy id") - return - } - removed, err := s.deletePolicy(r.Context(), id) - if err != nil { - writeError(w, http.StatusBadGateway, err.Error()) - return - } - if !removed { - writeError(w, http.StatusNotFound, "policy not found") - return - } - writeJSON(w, http.StatusOK, map[string]string{"deleted": id}) -} - -func (s *Server) executeBackup(ctx context.Context, req api.BackupRequest, requester string) (api.BackupResponse, string, error) { - req.Namespace = strings.TrimSpace(req.Namespace) - req.PVC = strings.TrimSpace(req.PVC) - if req.Namespace == "" || req.PVC == "" { - return api.BackupResponse{}, "validation_error", fmt.Errorf("namespace and pvc are required") - } - resolvedDedupe := dedupeDefault(req.Dedupe) - resolvedKeepLast := keepLastDefault(req.KeepLast) - req.Dedupe = boolPtr(resolvedDedupe) - req.KeepLast = intPtr(resolvedKeepLast) - - switch s.cfg.BackupDriver { - case "longhorn": - volumeName, _, _, err := s.client.ResolvePVCVolume(ctx, req.Namespace, req.PVC) - if err != nil { - return api.BackupResponse{}, "validation_error", err - } - - backupID := backupName("backup", req.Namespace+"-"+req.PVC) - response := api.BackupResponse{ - Driver: "longhorn", - Volume: volumeName, - Backup: backupID, - Namespace: req.Namespace, - RequestedBy: requester, - DryRun: req.DryRun, - Dedupe: resolvedDedupe, - KeepLast: resolvedKeepLast, - } - if req.DryRun { - return response, "dry_run", nil - } - - labels := map[string]string{ - "soteria.bstein.dev/namespace": req.Namespace, - "soteria.bstein.dev/pvc": req.PVC, - "soteria.bstein.dev/requested-by": requester, - } - if err := s.longhorn.CreateSnapshot(ctx, volumeName, backupID, labels); err != nil { - return api.BackupResponse{}, "backend_error", err - } - if _, err := s.longhorn.SnapshotBackup(ctx, volumeName, backupID, labels, s.cfg.LonghornBackupMode); err != nil { - return api.BackupResponse{}, "backend_error", err - } - return response, "success", nil - case "restic": - jobName, secretName, err := s.client.CreateBackupJob(ctx, s.cfg, req) - if err != nil { - return api.BackupResponse{}, "backend_error", err - } - result := "success" - if req.DryRun { - result = "dry_run" - } - return api.BackupResponse{ - Driver: "restic", - JobName: jobName, - Namespace: req.Namespace, - Secret: secretName, - RequestedBy: requester, - DryRun: req.DryRun, - Dedupe: resolvedDedupe, - KeepLast: resolvedKeepLast, - }, result, nil - default: - return api.BackupResponse{}, "unsupported_driver", fmt.Errorf("unsupported backup driver") - } -} - -func (s *Server) executeRestore(ctx context.Context, req api.RestoreTestRequest, requester string) (api.RestoreTestResponse, string, error) { - req.Namespace = strings.TrimSpace(req.Namespace) - req.PVC = strings.TrimSpace(req.PVC) - req.TargetNamespace = strings.TrimSpace(req.TargetNamespace) - req.TargetPVC = strings.TrimSpace(req.TargetPVC) - req.BackupURL = strings.TrimSpace(req.BackupURL) - req.Snapshot = strings.TrimSpace(req.Snapshot) - - if req.TargetNamespace == "" { - req.TargetNamespace = req.Namespace - } - - switch s.cfg.BackupDriver { - case "longhorn": - exists, err := s.client.PersistentVolumeClaimExists(ctx, req.TargetNamespace, req.TargetPVC) - if err != nil { - return api.RestoreTestResponse{}, "validation_error", err - } - if exists { - return api.RestoreTestResponse{}, "conflict", fmt.Errorf("target pvc %s/%s already exists", req.TargetNamespace, req.TargetPVC) - } - - volumeName, _, _, err := s.client.ResolvePVCVolume(ctx, req.Namespace, req.PVC) - if err != nil { - return api.RestoreTestResponse{}, "validation_error", err - } - - backupURL := req.BackupURL - if backupURL == "" { - backup, err := s.longhorn.FindBackup(ctx, volumeName, req.Snapshot) - if err != nil { - return api.RestoreTestResponse{}, "validation_error", err - } - backupURL = strings.TrimSpace(backup.URL) - } - if backupURL == "" { - return api.RestoreTestResponse{}, "validation_error", fmt.Errorf("backup_url is required") - } - - restoreVolumeName := backupName("restore", req.TargetNamespace+"-"+req.TargetPVC) - response := api.RestoreTestResponse{ - Driver: "longhorn", - Volume: restoreVolumeName, - TargetNamespace: req.TargetNamespace, - TargetPVC: req.TargetPVC, - BackupURL: backupURL, - Namespace: req.Namespace, - RequestedBy: requester, - DryRun: req.DryRun, - } - if req.DryRun { - return response, "dry_run", nil - } - - sourceVolume, err := s.longhorn.GetVolume(ctx, volumeName) - if err != nil { - return api.RestoreTestResponse{}, "backend_error", err - } - replicas := sourceVolume.NumberOfReplicas - if replicas == 0 { - replicas = 2 - } - - if _, err := s.longhorn.CreateVolumeFromBackup(ctx, restoreVolumeName, sourceVolume.Size, replicas, backupURL); err != nil { - return api.RestoreTestResponse{}, "backend_error", err - } - if err := s.longhorn.CreatePVC(ctx, restoreVolumeName, req.TargetNamespace, req.TargetPVC); err != nil { - cleanupErr := s.longhorn.DeleteVolume(ctx, restoreVolumeName) - if cleanupErr != nil { - log.Printf("restore cleanup failed for %s: %v", restoreVolumeName, cleanupErr) - return api.RestoreTestResponse{}, "backend_error", fmt.Errorf("create restore pvc: %v (cleanup failed: %v)", err, cleanupErr) - } - return api.RestoreTestResponse{}, "backend_error", fmt.Errorf("create restore pvc: %v", err) - } - return response, "success", nil - case "restic": - if repo, snapshot, ok := decodeResticSelector(req.BackupURL); ok { - if strings.TrimSpace(req.Snapshot) == "" { - req.Snapshot = snapshot - } - if strings.TrimSpace(req.Repository) == "" { - req.Repository = repo - } - } - jobName, secretName, err := s.client.CreateRestoreJob(ctx, s.cfg, req) - if err != nil { - return api.RestoreTestResponse{}, "backend_error", err - } - result := "success" - if req.DryRun { - result = "dry_run" - } - return api.RestoreTestResponse{ - Driver: "restic", - JobName: jobName, - Namespace: req.Namespace, - TargetNamespace: req.TargetNamespace, - TargetPVC: req.TargetPVC, - Secret: secretName, - RequestedBy: requester, - DryRun: req.DryRun, - }, result, nil - default: - return api.RestoreTestResponse{}, "unsupported_driver", fmt.Errorf("unsupported backup driver") - } -} - -func (s *Server) listNamespaceBoundPVCs(ctx context.Context, namespace string) ([]k8s.PVCSummary, error) { - items, err := s.client.ListBoundPVCs(ctx) - if err != nil { - return nil, err - } - filtered := make([]k8s.PVCSummary, 0, len(items)) - for _, item := range items { - if item.Namespace == namespace { - filtered = append(filtered, item) - } - } - return filtered, nil -} - -func backupStatusCode(result string) int { - switch result { - case "validation_error", "unsupported_driver": - return http.StatusBadRequest - case "backend_error": - return http.StatusBadGateway - default: - return http.StatusInternalServerError - } -} - -func restoreStatusCode(result string) int { - switch result { - case "validation_error", "unsupported_driver": - return http.StatusBadRequest - case "conflict": - return http.StatusConflict - case "backend_error": - return http.StatusBadGateway - default: - return http.StatusInternalServerError - } -} - -func namespaceResultStatus(dryRun bool, total, succeeded, failed int) string { - if dryRun { - return "dry_run" - } - if total == 0 { - return "empty" - } - if failed == 0 { - return "success" - } - if succeeded == 0 { - return "failed" - } - return "partial" -} - -func targetPVCName(prefix, sourcePVC string) string { - prefix = sanitizeName(prefix) - if prefix == "" { - prefix = "restore" - } - if !strings.HasSuffix(prefix, "-") { - prefix += "-" - } - name := sanitizeName(prefix + sourcePVC) - if name == "" { - name = "restore" - } - if len(name) > 63 { - name = strings.Trim(name[:63], "-") - } - if name == "" { - name = "restore" - } - return name -} - -func (s *Server) buildInventory(ctx context.Context) (api.InventoryResponse, error) { - pvcs, err := s.client.ListBoundPVCs(ctx) - if err != nil { - return api.InventoryResponse{}, err - } - - resticJobsByPVC, resticLookupErrors := s.prefetchResticBackupJobs(ctx, pvcs) - - groups := make(map[string][]api.PVCInventory) - for _, summary := range pvcs { - entry := api.PVCInventory{ - Namespace: summary.Namespace, - PVC: summary.Name, - Volume: summary.VolumeName, - Phase: summary.Phase, - StorageClass: summary.StorageClass, - Capacity: summary.Capacity, - AccessModes: summary.AccessModes, - Driver: s.cfg.BackupDriver, - } - s.enrichPVCInventory(ctx, &entry, resticJobsByPVC, resticLookupErrors) - groups[summary.Namespace] = append(groups[summary.Namespace], entry) - } - - namespaceNames := make([]string, 0, len(groups)) - for namespace := range groups { - namespaceNames = append(namespaceNames, namespace) - } - sort.Strings(namespaceNames) - - response := api.InventoryResponse{ - GeneratedAt: time.Now().UTC().Format(time.RFC3339), - Namespaces: make([]api.NamespaceInventory, 0, len(namespaceNames)), - } - for _, namespace := range namespaceNames { - response.Namespaces = append(response.Namespaces, api.NamespaceInventory{ - Name: namespace, - PVCs: groups[namespace], - }) - } - return response, nil -} - -func (s *Server) prefetchResticBackupJobs(ctx context.Context, pvcs []k8s.PVCSummary) (map[string][]k8s.BackupJobSummary, map[string]error) { - if s.cfg.BackupDriver != "restic" { - return nil, nil - } - - namespaces := map[string]struct{}{} - for _, pvc := range pvcs { - namespaces[pvc.Namespace] = struct{}{} - } - - namespaceNames := make([]string, 0, len(namespaces)) - for namespace := range namespaces { - namespaceNames = append(namespaceNames, namespace) - } - sort.Strings(namespaceNames) - - jobsByPVC := map[string][]k8s.BackupJobSummary{} - lookupErrors := map[string]error{} - for _, namespace := range namespaceNames { - jobs, err := s.client.ListBackupJobs(ctx, namespace) - if err != nil { - lookupErrors[namespace] = err - continue - } - for _, job := range jobs { - key := job.Namespace + "/" + job.PVC - jobsByPVC[key] = append(jobsByPVC[key], job) - } - } - - for key := range jobsByPVC { - sortBackupJobsNewestFirst(jobsByPVC[key]) - } - - return jobsByPVC, lookupErrors -} - -func (s *Server) enrichPVCInventory( - ctx context.Context, - entry *api.PVCInventory, - resticJobsByPVC map[string][]k8s.BackupJobSummary, - resticLookupErrors map[string]error, -) { - switch s.cfg.BackupDriver { - case "longhorn": - backups, err := s.longhorn.ListBackups(ctx, entry.Volume) - if err != nil { - entry.Healthy = false - entry.HealthReason = "lookup_failed" - entry.Error = err.Error() - return - } - entry.BackupCount = len(backups) - totalBackupSize := int64(0) - completedBackups := 0 - for _, backup := range backups { - totalBackupSize += parseSizeBytes(backup.Size) - if strings.EqualFold(backup.State, "Completed") { - completedBackups++ - } - } - entry.CompletedBackups = completedBackups - entry.TotalBackupSizeBytes = float64(totalBackupSize) - latest, latestTime, ok := latestCompletedBackup(backups) - if !ok { - entry.Healthy = false - if len(backups) == 0 { - entry.HealthReason = "missing" - } else { - entry.HealthReason = "no_completed" - } - return - } - entry.LastBackupAt = latest.Created - entry.LastBackupSizeBytes = float64(parseSizeBytes(latest.Size)) - if latestTime.IsZero() { - entry.Healthy = false - entry.HealthReason = "unknown_timestamp" - return - } - entry.LastBackupAgeHours = roundHours(time.Since(latestTime).Hours()) - entry.Healthy = time.Since(latestTime) <= s.cfg.BackupMaxAge - if entry.Healthy { - entry.HealthReason = "fresh" - } else { - entry.HealthReason = "stale" - } - case "restic": - if err, hasErr := resticLookupErrors[entry.Namespace]; hasErr { - entry.Healthy = false - entry.HealthReason = "lookup_failed" - entry.Error = err.Error() - return - } - - key := entry.Namespace + "/" + entry.PVC - jobs := resticJobsByPVC[key] - if jobs == nil { - jobs = []k8s.BackupJobSummary{} - } - entry.BackupCount = len(jobs) - if len(jobs) > 0 { - entry.LastJobName = jobs[0].Name - entry.LastJobState = jobs[0].State - entry.LastJobProgressPct = backupJobProgressPct(jobs[0].State) - if !jobs[0].CreatedAt.IsZero() { - entry.LastJobStartedAt = jobs[0].CreatedAt.UTC().Format(time.RFC3339) - } - } - - completed := make([]k8s.BackupJobSummary, 0, len(jobs)) - active := 0 - for _, job := range jobs { - if backupJobInProgress(job.State) { - active++ - } - if strings.EqualFold(job.State, "Completed") { - completed = append(completed, job) - } - } - entry.ActiveBackups = active - entry.CompletedBackups = len(completed) - sizeSamples := completed - if len(sizeSamples) > 0 { - retained := sizeSamples[0].KeepLast - if retained > 0 && retained < len(sizeSamples) { - sizeSamples = sizeSamples[:retained] - } - } - totalStoredBytes := 0.0 - storedSamples := 0 - for index, job := range sizeSamples { - if index >= maxUsageSampleJobs { - break - } - storedBytes, ok := s.lookupResticStoredBytesForJob(ctx, entry.Namespace, job.Name) - if !ok { - continue - } - if index == 0 { - entry.LastBackupSizeBytes = storedBytes - } - totalStoredBytes += storedBytes - storedSamples++ - } - if storedSamples > 0 { - entry.TotalBackupSizeBytes = totalStoredBytes - } - if len(completed) == 0 { - entry.Healthy = false - switch { - case active > 0: - entry.HealthReason = "in_progress" - case len(jobs) == 0: - entry.HealthReason = "missing" - default: - entry.HealthReason = "no_completed" - } - return - } - - latestTime := backupJobTimestamp(completed[0]) - if latestTime.IsZero() { - entry.Healthy = false - entry.HealthReason = "unknown_timestamp" - return - } - entry.LastBackupAt = latestTime.UTC().Format(time.RFC3339) - entry.LastBackupAgeHours = roundHours(time.Since(latestTime).Hours()) - entry.Healthy = time.Since(latestTime) <= s.cfg.BackupMaxAge - if entry.Healthy { - entry.HealthReason = "fresh" - } else { - entry.HealthReason = "stale" - } - default: - entry.Healthy = false - entry.HealthReason = "unsupported_driver" - } -} - -func sortBackupJobsNewestFirst(items []k8s.BackupJobSummary) { - sort.Slice(items, func(i, j int) bool { - left := items[i].CompletionTime - if left.IsZero() { - left = items[i].CreatedAt - } - right := items[j].CompletionTime - if right.IsZero() { - right = items[j].CreatedAt - } - if left.Equal(right) { - return items[i].Name > items[j].Name - } - return left.After(right) - }) -} - -func (s *Server) lookupResticStoredBytesForJob(ctx context.Context, namespace, jobName string) (float64, bool) { - key := namespace + "/" + jobName - - s.jobUsageMu.RLock() - cached, ok := s.jobUsage[key] - s.jobUsageMu.RUnlock() - if ok && time.Since(cached.CheckedAt) < 15*time.Minute { - return cached.Bytes, cached.Known - } - - if bytes, known := s.lookupPersistedResticUsage(key); known { - entry := resticJobUsageCacheEntry{ - Known: true, - Bytes: bytes, - CheckedAt: time.Now().UTC(), - } - s.jobUsageMu.Lock() - if s.jobUsage == nil { - s.jobUsage = map[string]resticJobUsageCacheEntry{} - } - s.jobUsage[key] = entry - s.jobUsageMu.Unlock() - return bytes, true - } - - logBody, err := s.client.ReadBackupJobLog(ctx, namespace, jobName) - entry := resticJobUsageCacheEntry{ - Known: false, - Bytes: 0, - CheckedAt: time.Now().UTC(), - } - if err == nil { - if parsedBytes, parsed := parseResticStoredBytes(logBody); parsed { - entry.Known = true - entry.Bytes = parsedBytes - s.storePersistedResticUsage(ctx, key, parsedBytes) - } - } - - s.jobUsageMu.Lock() - if s.jobUsage == nil { - s.jobUsage = map[string]resticJobUsageCacheEntry{} - } - s.jobUsage[key] = entry - s.jobUsageMu.Unlock() - - return entry.Bytes, entry.Known -} - -func parseResticStoredBytes(logBody string) (float64, bool) { - if logBody == "" { - return 0, false - } - matches := resticDataAddedPattern.FindAllStringSubmatch(logBody, -1) - if len(matches) > 0 { - last := matches[len(matches)-1] - if len(last) > 1 { - if value, err := strconv.ParseFloat(strings.TrimSpace(last[1]), 64); err == nil { - return value, true - } - } - } - - textMatches := resticAddedStoredPattern.FindAllStringSubmatch(logBody, -1) - if len(textMatches) == 0 { - return 0, false - } - last := textMatches[len(textMatches)-1] - if len(last) < 2 { - return 0, false - } - return parseHumanByteSize(last[1]) -} - -func parseHumanByteSize(raw string) (float64, bool) { - parts := strings.Fields(strings.TrimSpace(raw)) - if len(parts) < 2 { - return 0, false - } - value, err := strconv.ParseFloat(strings.ReplaceAll(parts[0], ",", ""), 64) - if err != nil { - return 0, false - } - unit := strings.ToUpper(strings.TrimSpace(parts[1])) - switch unit { - case "B": - return value, true - case "KIB": - return value * 1024, true - case "MIB": - return value * 1024 * 1024, true - case "GIB": - return value * 1024 * 1024 * 1024, true - case "TIB": - return value * 1024 * 1024 * 1024 * 1024, true - case "KB": - return value * 1000, true - case "MB": - return value * 1000 * 1000, true - case "GB": - return value * 1000 * 1000 * 1000, true - case "TB": - return value * 1000 * 1000 * 1000 * 1000, true - default: - return 0, false - } -} - -func (s *Server) loadResticUsage(ctx context.Context) error { - if strings.TrimSpace(s.cfg.UsageSecretName) == "" { - return nil - } - raw, err := s.client.LoadSecretData(ctx, s.cfg.Namespace, s.cfg.UsageSecretName, usageSecretKey) - if err != nil { - return err - } - if len(raw) == 0 { - return nil - } - - var doc resticPersistedUsageDocument - if err := json.Unmarshal(raw, &doc); err != nil { - return fmt.Errorf("decode restic usage document: %w", err) - } - next := map[string]resticPersistedUsageEntry{} - for _, item := range doc.Jobs { - key := strings.TrimSpace(item.Key) - if key == "" || item.Bytes < 0 || math.IsNaN(item.Bytes) || math.IsInf(item.Bytes, 0) { - continue - } - next[key] = resticPersistedUsageEntry{ - Bytes: item.Bytes, - UpdatedAt: strings.TrimSpace(item.UpdatedAt), - } - } - - s.usageMu.Lock() - s.usageStore = next - s.usageMu.Unlock() - return nil -} - -func (s *Server) lookupPersistedResticUsage(key string) (float64, bool) { - s.usageMu.RLock() - defer s.usageMu.RUnlock() - if s.usageStore == nil { - return 0, false - } - entry, ok := s.usageStore[key] - if !ok { - return 0, false - } - if entry.Bytes < 0 || math.IsNaN(entry.Bytes) || math.IsInf(entry.Bytes, 0) { - return 0, false - } - return entry.Bytes, true -} - -func (s *Server) storePersistedResticUsage(ctx context.Context, key string, value float64) { - if key == "" || value < 0 || math.IsNaN(value) || math.IsInf(value, 0) { - return - } - now := time.Now().UTC().Format(time.RFC3339) - changed := false - - s.usageMu.Lock() - if s.usageStore == nil { - s.usageStore = map[string]resticPersistedUsageEntry{} - } - current, exists := s.usageStore[key] - if !exists || current.Bytes != value || strings.TrimSpace(current.UpdatedAt) == "" { - s.usageStore[key] = resticPersistedUsageEntry{ - Bytes: value, - UpdatedAt: now, - } - changed = true - } - s.usageMu.Unlock() - - if !changed { - return - } - if err := s.persistResticUsage(ctx); err != nil { - log.Printf("persist restic usage failed: %v", err) - } -} - -func (s *Server) persistResticUsage(ctx context.Context) error { - if strings.TrimSpace(s.cfg.UsageSecretName) == "" { - return nil - } - s.usageMu.RLock() - entries := make([]struct { - Key string - Value resticPersistedUsageEntry - }, 0, len(s.usageStore)) - for key, value := range s.usageStore { - entries = append(entries, struct { - Key string - Value resticPersistedUsageEntry - }{Key: key, Value: value}) - } - s.usageMu.RUnlock() - - sort.Slice(entries, func(i, j int) bool { - return entries[i].Key < entries[j].Key - }) - - doc := resticPersistedUsageDocument{ - Jobs: make([]struct { - Key string `json:"key"` - Bytes float64 `json:"bytes"` - UpdatedAt string `json:"updated_at,omitempty"` - }, 0, len(entries)), - } - for _, entry := range entries { - if entry.Key == "" || entry.Value.Bytes < 0 || math.IsNaN(entry.Value.Bytes) || math.IsInf(entry.Value.Bytes, 0) { - continue - } - doc.Jobs = append(doc.Jobs, struct { - Key string `json:"key"` - Bytes float64 `json:"bytes"` - UpdatedAt string `json:"updated_at,omitempty"` - }{ - Key: entry.Key, - Bytes: entry.Value.Bytes, - UpdatedAt: strings.TrimSpace(entry.Value.UpdatedAt), - }) - } - - payload, err := json.Marshal(doc) - if err != nil { - return fmt.Errorf("encode restic usage document: %w", err) - } - return s.client.SaveSecretData(ctx, s.cfg.Namespace, s.cfg.UsageSecretName, usageSecretKey, payload, map[string]string{ - "app.kubernetes.io/name": "soteria", - "app.kubernetes.io/component": "usage-store", - }) -} - -func (s *Server) refreshTelemetry(ctx context.Context) { - refreshCtx, cancel := context.WithTimeout(ctx, 2*time.Minute) - defer cancel() - - inventory, err := s.buildInventory(refreshCtx) - if err != nil { - log.Printf("inventory refresh failed: %v", err) - s.metrics.RecordInventoryFailure() - return - } - s.metrics.RecordInventory(inventory) -} - -func (s *Server) runPolicyCycle(ctx context.Context) { - if !s.beginRun() { - return - } - defer s.endRun() - - policies := s.activePolicies() - if len(policies) == 0 { - return - } - - runCtx, cancel := context.WithTimeout(ctx, 3*time.Minute) - defer cancel() - - inventory, err := s.buildInventory(runCtx) - if err != nil { - log.Printf("policy cycle inventory failed: %v", err) - s.metrics.RecordPolicyBackup("inventory_error") - return - } - - pvcMap := make(map[string]api.PVCInventory) - namespaceMap := make(map[string][]api.PVCInventory) - for _, group := range inventory.Namespaces { - for _, pvc := range group.PVCs { - key := pvc.Namespace + "/" + pvc.PVC - pvcMap[key] = pvc - namespaceMap[pvc.Namespace] = append(namespaceMap[pvc.Namespace], pvc) - } - } - - type effectivePolicy struct { - IntervalHours float64 - Dedupe bool - KeepLast int - } - effectivePolicies := map[string]effectivePolicy{} - for _, policy := range policies { - matches := []api.PVCInventory{} - if policy.PVC != "" { - if pvc, ok := pvcMap[policy.Namespace+"/"+policy.PVC]; ok { - matches = append(matches, pvc) - } - } else { - matches = append(matches, namespaceMap[policy.Namespace]...) - } - - for _, pvc := range matches { - key := pvc.Namespace + "/" + pvc.PVC - current, exists := effectivePolicies[key] - if !exists || policy.IntervalHours < current.IntervalHours || (policy.IntervalHours == current.IntervalHours && keepLastStricter(policy.KeepLast, current.KeepLast)) { - effectivePolicies[key] = effectivePolicy{ - IntervalHours: policy.IntervalHours, - Dedupe: policy.Dedupe, - KeepLast: policy.KeepLast, - } - } - } - } - - for key, effective := range effectivePolicies { - pvc, ok := pvcMap[key] - if !ok { - continue - } - // Never enqueue a new policy backup while one is already active for this PVC. - // This prevents runaway job storms when a backup is stuck Pending/Running. - if pvc.ActiveBackups > 0 { - s.metrics.RecordPolicyBackup("in_progress") - continue - } - - lastRunRef := strings.TrimSpace(pvc.LastBackupAt) - if lastRunRef == "" { - // If no successful backup exists yet, fall back to the most recent job start - // so failed attempts are still throttled by interval_hours. - lastRunRef = strings.TrimSpace(pvc.LastJobStartedAt) - } - - if !backupDue(lastRunRef, effective.IntervalHours) { - s.metrics.RecordPolicyBackup("not_due") - continue - } - - _, result, err := s.executeBackup(runCtx, api.BackupRequest{ - Namespace: pvc.Namespace, - PVC: pvc.PVC, - DryRun: false, - Dedupe: boolPtr(effective.Dedupe), - KeepLast: intPtr(effective.KeepLast), - }, "policy-scheduler") - s.metrics.RecordBackupRequest(s.cfg.BackupDriver, result) - if err != nil { - s.metrics.RecordPolicyBackup(result) - log.Printf("policy backup failed for %s/%s: %v", pvc.Namespace, pvc.PVC, err) - continue - } - s.metrics.RecordPolicyBackup("success") - } -} - -func (s *Server) beginRun() bool { - s.runMu.Lock() - defer s.runMu.Unlock() - if s.running { - return false - } - s.running = true - return true -} - -func (s *Server) endRun() { - s.runMu.Lock() - defer s.runMu.Unlock() - s.running = false -} - -func (s *Server) loadPolicies(ctx context.Context) error { - raw, err := s.client.LoadSecretData(ctx, s.cfg.Namespace, s.cfg.PolicySecretName, policySecretKey) - if err != nil { - return err - } - if len(raw) == 0 { - return nil - } - - var doc struct { - Policies []struct { - ID string `json:"id"` - Namespace string `json:"namespace"` - PVC string `json:"pvc,omitempty"` - IntervalHours float64 `json:"interval_hours"` - Enabled bool `json:"enabled"` - Dedupe *bool `json:"dedupe,omitempty"` - KeepLast *int `json:"keep_last,omitempty"` - CreatedAt string `json:"created_at,omitempty"` - UpdatedAt string `json:"updated_at,omitempty"` - } `json:"policies"` - } - if err := json.Unmarshal(raw, &doc); err != nil { - return fmt.Errorf("decode policy document: %w", err) - } - - next := map[string]api.BackupPolicy{} - now := time.Now().UTC().Format(time.RFC3339) - for _, policy := range doc.Policies { - namespace := strings.TrimSpace(policy.Namespace) - pvc := strings.TrimSpace(policy.PVC) - if namespace == "" { - continue - } - interval := policy.IntervalHours - if interval <= 0 { - interval = defaultPolicyHours - } - dedupe := true - if policy.Dedupe != nil { - dedupe = *policy.Dedupe - } - keepLast := keepLastDefault(policy.KeepLast) - id := policyKey(namespace, pvc) - createdAt := policy.CreatedAt - if createdAt == "" { - createdAt = now - } - updatedAt := policy.UpdatedAt - if updatedAt == "" { - updatedAt = createdAt - } - next[id] = api.BackupPolicy{ - ID: id, - Namespace: namespace, - PVC: pvc, - IntervalHours: interval, - Enabled: policy.Enabled, - Dedupe: dedupe, - KeepLast: keepLast, - CreatedAt: createdAt, - UpdatedAt: updatedAt, - } - } - - s.policyMu.Lock() - s.policies = next - s.policyMu.Unlock() - return nil -} - -func (s *Server) persistPolicies(ctx context.Context, policies []api.BackupPolicy) error { - doc := struct { - Policies []api.BackupPolicy `json:"policies"` - }{ - Policies: policies, - } - payload, err := json.Marshal(doc) - if err != nil { - return fmt.Errorf("encode policy document: %w", err) - } - return s.client.SaveSecretData(ctx, s.cfg.Namespace, s.cfg.PolicySecretName, policySecretKey, payload, map[string]string{ - "app.kubernetes.io/name": "soteria", - "app.kubernetes.io/component": "policy-store", - }) -} - -func (s *Server) listPolicies() []api.BackupPolicy { - s.policyMu.RLock() - defer s.policyMu.RUnlock() - policies := make([]api.BackupPolicy, 0, len(s.policies)) - for _, policy := range s.policies { - policies = append(policies, policy) - } - sort.Slice(policies, func(i, j int) bool { - if policies[i].Namespace != policies[j].Namespace { - return policies[i].Namespace < policies[j].Namespace - } - if policies[i].PVC != policies[j].PVC { - return policies[i].PVC < policies[j].PVC - } - return policies[i].ID < policies[j].ID - }) - return policies -} - -func (s *Server) activePolicies() []api.BackupPolicy { - policies := s.listPolicies() - filtered := make([]api.BackupPolicy, 0, len(policies)) - for _, policy := range policies { - if policy.Enabled { - filtered = append(filtered, policy) - } - } - return filtered -} - -func (s *Server) upsertPolicy(ctx context.Context, req api.BackupPolicyUpsertRequest) (api.BackupPolicy, error) { - namespace := strings.TrimSpace(req.Namespace) - pvc := strings.TrimSpace(req.PVC) - if namespace == "" { - return api.BackupPolicy{}, fmt.Errorf("namespace is required") - } - if err := validateKubernetesName("namespace", namespace); err != nil { - return api.BackupPolicy{}, err - } - if pvc != "" { - if err := validateKubernetesName("pvc", pvc); err != nil { - return api.BackupPolicy{}, err - } - } - - interval := req.IntervalHours - if interval <= 0 { - interval = defaultPolicyHours - } - if interval > maxPolicyIntervalHrs { - return api.BackupPolicy{}, fmt.Errorf("interval_hours must be <= %d", maxPolicyIntervalHrs) - } - enabled := true - if req.Enabled != nil { - enabled = *req.Enabled - } - dedupe := dedupeDefault(req.Dedupe) - if err := validateKeepLast(req.KeepLast); err != nil { - return api.BackupPolicy{}, err - } - keepLast := keepLastDefault(req.KeepLast) - - id := policyKey(namespace, pvc) - now := time.Now().UTC().Format(time.RFC3339) - - s.policyMu.Lock() - before := clonePolicyMap(s.policies) - createdAt := now - if existing, ok := s.policies[id]; ok && existing.CreatedAt != "" { - createdAt = existing.CreatedAt - } - policy := api.BackupPolicy{ - ID: id, - Namespace: namespace, - PVC: pvc, - IntervalHours: interval, - Enabled: enabled, - Dedupe: dedupe, - KeepLast: keepLast, - CreatedAt: createdAt, - UpdatedAt: now, - } - s.policies[id] = policy - snapshot := policySliceFromMap(s.policies) - s.policyMu.Unlock() - - if err := s.persistPolicies(ctx, snapshot); err != nil { - s.policyMu.Lock() - s.policies = before - s.policyMu.Unlock() - return api.BackupPolicy{}, err - } - return policy, nil -} - -func (s *Server) deletePolicy(ctx context.Context, id string) (bool, error) { - id = strings.TrimSpace(id) - if id == "" { - return false, nil - } - - s.policyMu.Lock() - if _, ok := s.policies[id]; !ok { - s.policyMu.Unlock() - return false, nil - } - before := clonePolicyMap(s.policies) - delete(s.policies, id) - snapshot := policySliceFromMap(s.policies) - s.policyMu.Unlock() - - if err := s.persistPolicies(ctx, snapshot); err != nil { - s.policyMu.Lock() - s.policies = before - s.policyMu.Unlock() - return false, err - } - return true, nil -} - -func policyKey(namespace, pvc string) string { - scope := strings.TrimSpace(pvc) - if scope == "" { - scope = "_all" - } - return strings.TrimSpace(namespace) + "__" + scope -} - -func policySliceFromMap(source map[string]api.BackupPolicy) []api.BackupPolicy { - out := make([]api.BackupPolicy, 0, len(source)) - for _, policy := range source { - out = append(out, policy) - } - sort.Slice(out, func(i, j int) bool { - return out[i].ID < out[j].ID - }) - return out -} - -func clonePolicyMap(source map[string]api.BackupPolicy) map[string]api.BackupPolicy { - cloned := make(map[string]api.BackupPolicy, len(source)) - for key, value := range source { - cloned[key] = value - } - return cloned -} - -func backupDue(lastBackupAt string, intervalHours float64) bool { - if intervalHours <= 0 { - intervalHours = defaultPolicyHours - } - if strings.TrimSpace(lastBackupAt) == "" { - return true - } - timestamp, ok := parseBackupTime(lastBackupAt) - if !ok { - return true - } - interval := time.Duration(intervalHours * float64(time.Hour)) - return time.Since(timestamp) >= interval -} - -func (s *Server) authorize(r *http.Request) (authIdentity, int, error) { - if !s.cfg.AuthRequired { - return authIdentity{}, http.StatusOK, nil - } - - authorization := strings.TrimSpace(r.Header.Get("Authorization")) - if strings.HasPrefix(strings.ToLower(authorization), "bearer ") { - token := strings.TrimSpace(authorization[7:]) - for _, expected := range s.cfg.AuthBearerTokens { - if token != "" && token == expected { - return authIdentity{Authenticated: true, User: "service-token", Groups: []string{"service-token"}}, http.StatusOK, nil - } - } - } - - identity := authIdentity{ - Authenticated: true, - User: firstHeader(r, "X-Auth-Request-User", "X-Forwarded-User"), - Email: firstHeader(r, "X-Auth-Request-Email", "X-Forwarded-Email"), - Groups: normalizeGroups(splitGroups(firstHeader(r, "X-Auth-Request-Groups", "X-Forwarded-Groups"))), - } - if identity.User == "" && identity.Email == "" { - return authIdentity{}, http.StatusUnauthorized, fmt.Errorf("authentication required") - } - if len(s.cfg.AllowedGroups) == 0 { - return identity, http.StatusOK, nil - } - if hasAllowedGroup(identity.Groups, s.cfg.AllowedGroups) { - return identity, http.StatusOK, nil - } - return authIdentity{}, http.StatusForbidden, fmt.Errorf("access requires one of: %s", strings.Join(s.cfg.AllowedGroups, ", ")) -} - -func requesterFromContext(ctx context.Context) authIdentity { - identity, _ := ctx.Value(authContextKey).(authIdentity) - return identity -} - -func currentRequester(ctx context.Context) string { - identity := requesterFromContext(ctx) - if identity.User != "" { - return identity.User - } - if identity.Email != "" { - return identity.Email - } - if identity.Authenticated { - return "authenticated" - } - return "anonymous" -} - -func authzReason(status int, err error) string { - if err == nil { - return "unknown" - } - switch status { - case http.StatusUnauthorized: - return "unauthenticated" - case http.StatusForbidden: - return "forbidden_group" - default: - return "error" - } -} - -func hasAllowedGroup(actual, allowed []string) bool { - allowedSet := make(map[string]struct{}, len(allowed)) - for _, group := range normalizeGroups(allowed) { - allowedSet[group] = struct{}{} - } - for _, group := range normalizeGroups(actual) { - if _, ok := allowedSet[group]; ok { - return true - } - } - return false -} - -func normalizeGroups(values []string) []string { - groups := make([]string, 0, len(values)) - for _, value := range values { - value = strings.TrimSpace(strings.TrimPrefix(value, "/")) - if value == "" { - continue - } - groups = append(groups, value) - } - return groups -} - -func firstHeader(r *http.Request, names ...string) string { - for _, name := range names { - value := strings.TrimSpace(r.Header.Get(name)) - if value != "" { - return value - } - } - return "" -} - -func splitGroups(raw string) []string { - raw = strings.TrimSpace(raw) - if raw == "" { - return nil - } - return strings.FieldsFunc(raw, func(r rune) bool { - return r == ',' || r == ';' - }) -} - -func validateKubernetesName(field, value string) error { - if errs := k8svalidation.IsDNS1123Label(value); len(errs) > 0 { - return fmt.Errorf("%s must be a valid Kubernetes DNS-1123 label", field) - } - return nil -} - -func buildBackupRecords(backups []longhorn.Backup) []api.BackupRecord { - records := make([]api.BackupRecord, 0, len(backups)) - latestName := "" - if latest, _, ok := latestCompletedBackup(backups); ok { - latestName = latest.Name - } - - sort.Slice(backups, func(i, j int) bool { - left, lok := parseBackupTime(backups[i].Created) - right, rok := parseBackupTime(backups[j].Created) - switch { - case lok && rok: - return left.After(right) - case lok: - return true - case rok: - return false - default: - return backups[i].Name > backups[j].Name - } - }) - - for _, backup := range backups { - records = append(records, api.BackupRecord{ - Name: backup.Name, - SnapshotName: backup.SnapshotName, - Created: backup.Created, - State: backup.State, - URL: backup.URL, - Size: backup.Size, - Latest: backup.Name == latestName, - }) - } - return records -} - -func (s *Server) buildResticBackupRecords(ctx context.Context, namespace string, jobs []k8s.BackupJobSummary, defaultRepository string) []api.BackupRecord { - records := make([]api.BackupRecord, 0, len(jobs)) - latestName := "" - for _, job := range jobs { - if strings.EqualFold(job.State, "Completed") { - latestName = job.Name - break - } - } - - for _, job := range jobs { - created := "" - if ts := backupJobTimestamp(job); !ts.IsZero() { - created = ts.UTC().Format(time.RFC3339) - } - url := "" - size := "" - latest := job.Name == latestName - if latest && strings.EqualFold(job.State, "Completed") { - repository := strings.TrimSpace(job.Repository) - if repository == "" { - repository = strings.TrimSpace(defaultRepository) - } - url = encodeResticSelector(repository) - } - if strings.EqualFold(job.State, "Completed") { - if bytes, ok := s.lookupResticStoredBytesForJob(ctx, namespace, job.Name); ok { - size = formatBytesIEC(bytes) - } - } - records = append(records, api.BackupRecord{ - Name: job.Name, - SnapshotName: job.Name, - Created: created, - State: job.State, - URL: url, - Size: size, - Latest: latest, - }) - } - return records -} - -func encodeResticSelector(repository string) string { - repository = strings.TrimSpace(repository) - if repository == "" { - return "latest" - } - return resticSelectorPrefix + base64.RawURLEncoding.EncodeToString([]byte(repository)) -} - -func decodeResticSelector(raw string) (string, string, bool) { - raw = strings.TrimSpace(raw) - if raw == "" { - return "", "", false - } - if raw == "latest" { - return "", "latest", true - } - if !strings.HasPrefix(raw, resticSelectorPrefix) { - return "", "", false - } - encoded := strings.TrimPrefix(raw, resticSelectorPrefix) - if encoded == "" { - return "", "", false - } - decoded, err := base64.RawURLEncoding.DecodeString(encoded) - if err != nil { - return "", "", false - } - repository := strings.TrimSpace(string(decoded)) - if repository == "" { - return "", "", false - } - return repository, "latest", true -} - -func backupJobTimestamp(job k8s.BackupJobSummary) time.Time { - if !job.CompletionTime.IsZero() { - return job.CompletionTime - } - return job.CreatedAt -} - -func backupJobInProgress(state string) bool { - switch strings.ToLower(strings.TrimSpace(state)) { - case "pending", "running": - return true - default: - return false - } -} - -func backupJobProgressPct(state string) int { - switch strings.ToLower(strings.TrimSpace(state)) { - case "pending": - return 20 - case "running": - return 70 - case "completed", "failed": - return 100 - default: - return 0 - } -} - -func latestCompletedBackup(backups []longhorn.Backup) (longhorn.Backup, time.Time, bool) { - var selected longhorn.Backup - var selectedTime time.Time - found := false - for _, backup := range backups { - if backup.State != "Completed" { - continue - } - createdAt, ok := parseBackupTime(backup.Created) - if !ok { - if !found { - selected = backup - found = true - } - continue - } - if !found || createdAt.After(selectedTime) { - selected = backup - selectedTime = createdAt - found = true - } - } - return selected, selectedTime, found -} - -func parseBackupTime(raw string) (time.Time, bool) { - layouts := []string{time.RFC3339Nano, time.RFC3339} - for _, layout := range layouts { - parsed, err := time.Parse(layout, raw) - if err == nil { - return parsed, true - } - } - return time.Time{}, false -} - -func writeJSON(w http.ResponseWriter, status int, payload any) { - w.Header().Set("Content-Type", "application/json") - w.WriteHeader(status) - _ = json.NewEncoder(w).Encode(payload) -} - -func writeError(w http.ResponseWriter, status int, message string) { - writeJSON(w, status, map[string]string{"error": message}) -} - -func backupName(prefix, value string) string { - base := sanitizeName(fmt.Sprintf("soteria-%s-%s", prefix, value)) - timestamp := time.Now().UTC().Format("20060102-150405") - name := fmt.Sprintf("%s-%s", base, timestamp) - if len(name) <= 63 { - return name - } - maxBase := 63 - len(timestamp) - 1 - if maxBase < 1 { - maxBase = 1 - } - if len(base) > maxBase { - base = base[:maxBase] - } - return fmt.Sprintf("%s-%s", base, timestamp) -} - -func sanitizeName(value string) string { - value = strings.ToLower(value) - value = strings.ReplaceAll(value, "_", "-") - value = strings.ReplaceAll(value, ".", "-") - value = strings.ReplaceAll(value, " ", "-") - value = strings.Trim(value, "-") - return value -} - -func roundHours(value float64) float64 { - return math.Round(value*100) / 100 -} - -func parseSizeBytes(raw string) int64 { - raw = strings.TrimSpace(raw) - if raw == "" { - return 0 - } - if value, err := strconv.ParseInt(raw, 10, 64); err == nil { - return value - } - if value, err := strconv.ParseFloat(raw, 64); err == nil { - if value < 0 { - return 0 - } - return int64(value) - } - if quantity, err := resource.ParseQuantity(raw); err == nil { - return quantity.Value() - } - return 0 -} - -func formatBytesIEC(value float64) string { - if value <= 0 || math.IsNaN(value) || math.IsInf(value, 0) { - return "0 B" - } - units := []string{"B", "KiB", "MiB", "GiB", "TiB"} - size := value - unit := 0 - for size >= 1024 && unit < len(units)-1 { - size /= 1024 - unit++ - } - if unit == 0 { - return fmt.Sprintf("%.0f %s", size, units[unit]) - } - return fmt.Sprintf("%.2f %s", size, units[unit]) -} - -func dedupeDefault(value *bool) bool { - if value == nil { - return true - } - return *value -} - -func boolPtr(value bool) *bool { - ptr := value - return &ptr -} - -func keepLastDefault(value *int) int { - if value == nil { - return 0 - } - if *value < 0 { - return 0 - } - return *value -} - -func intPtr(value int) *int { - ptr := value - return &ptr -} - -func validateKeepLast(value *int) error { - if value == nil { - return nil - } - if *value < 0 { - return fmt.Errorf("keep_last must be >= 0") - } - if *value > maxPolicyKeepLast { - return fmt.Errorf("keep_last must be <= %d", maxPolicyKeepLast) - } - return nil -} - -func keepLastStricter(candidate, current int) bool { - switch { - case candidate > 0 && current == 0: - return true - case candidate == 0: - return false - case current == 0: - return true - default: - return candidate < current - } -} diff --git a/internal/server/server_test.go b/internal/server/server_test.go index f59148b..016dbf3 100644 --- a/internal/server/server_test.go +++ b/internal/server/server_test.go @@ -175,6 +175,152 @@ func (f *fakeLonghornClient) ListBackups(_ context.Context, volumeName string) ( return f.backups, nil } +func TestNewInitializesCoreServerFields(t *testing.T) { + srv := New(&config.Config{}, nil, nil) + + if srv == nil { + t.Fatalf("expected server instance") + } + if srv.metrics == nil { + t.Fatalf("expected telemetry to be initialized") + } + if srv.ui == nil { + t.Fatalf("expected UI renderer to be initialized") + } + if srv.handler == nil { + t.Fatalf("expected handler to be initialized") + } + if srv.policies == nil || srv.jobUsage == nil || srv.usageStore == nil { + t.Fatalf("expected server maps to be initialized: %#v", srv) + } +} + +func TestHealthAndReadyEndpoints(t *testing.T) { + srv := New(&config.Config{}, nil, nil) + + testCases := []struct { + path string + status string + }{ + {path: "/healthz", status: "ok"}, + {path: "/readyz", status: "ready"}, + } + + for _, tc := range testCases { + req := httptest.NewRequest(http.MethodGet, tc.path, nil) + res := httptest.NewRecorder() + srv.Handler().ServeHTTP(res, req) + + if res.Code != http.StatusOK { + t.Fatalf("%s expected 200, got %d: %s", tc.path, res.Code, res.Body.String()) + } + + var payload map[string]string + if err := json.Unmarshal(res.Body.Bytes(), &payload); err != nil { + t.Fatalf("%s decode response: %v", tc.path, err) + } + if payload["status"] != tc.status { + t.Fatalf("%s expected status %q, got %#v", tc.path, tc.status, payload) + } + } +} + +func TestWhoAmIReturnsForwardedIdentity(t *testing.T) { + srv := New(&config.Config{AuthRequired: true}, nil, nil) + + req := httptest.NewRequest(http.MethodGet, "/v1/whoami", nil) + req.Header.Set("X-Forwarded-User", "brad") + req.Header.Set("X-Forwarded-Email", "brad@bstein.dev") + req.Header.Set("X-Forwarded-Groups", "/ops,/dev") + res := httptest.NewRecorder() + srv.Handler().ServeHTTP(res, req) + + if res.Code != http.StatusOK { + t.Fatalf("expected 200, got %d: %s", res.Code, res.Body.String()) + } + + var payload api.AuthInfoResponse + if err := json.Unmarshal(res.Body.Bytes(), &payload); err != nil { + t.Fatalf("decode whoami response: %v", err) + } + if !payload.Authenticated || payload.User != "brad" || payload.Email != "brad@bstein.dev" { + t.Fatalf("unexpected whoami payload: %#v", payload) + } + if len(payload.Groups) != 2 || payload.Groups[0] != "ops" || payload.Groups[1] != "dev" { + t.Fatalf("unexpected whoami groups: %#v", payload.Groups) + } +} + +func TestWhoAmIRejectsUnsupportedMethod(t *testing.T) { + srv := New(&config.Config{}, nil, nil) + + req := httptest.NewRequest(http.MethodPost, "/v1/whoami", nil) + res := httptest.NewRecorder() + srv.Handler().ServeHTTP(res, req) + + if res.Code != http.StatusMethodNotAllowed { + t.Fatalf("expected 405, got %d: %s", res.Code, res.Body.String()) + } +} + +func TestRootRejectsUnsupportedMethod(t *testing.T) { + srv := New(&config.Config{}, nil, nil) + + req := httptest.NewRequest(http.MethodPost, "/", nil) + res := httptest.NewRecorder() + srv.Handler().ServeHTTP(res, req) + + if res.Code != http.StatusMethodNotAllowed { + t.Fatalf("expected 405, got %d: %s", res.Code, res.Body.String()) + } +} + +func TestRootFailsWhenUIRendererUnavailable(t *testing.T) { + srv := New(&config.Config{}, nil, nil) + srv.ui = nil + + req := httptest.NewRequest(http.MethodGet, "/", nil) + res := httptest.NewRecorder() + srv.Handler().ServeHTTP(res, req) + + if res.Code != http.StatusInternalServerError { + t.Fatalf("expected 500, got %d: %s", res.Code, res.Body.String()) + } +} + +func TestStartSeedsInitialBackgroundState(t *testing.T) { + srv := &Server{ + cfg: &config.Config{ + AuthRequired: false, + BackupDriver: "longhorn", + BackupMaxAge: 24 * time.Hour, + MetricsRefreshInterval: time.Hour, + PolicyEvalInterval: time.Hour, + B2Enabled: false, + Namespace: "maintenance", + PolicySecretName: "soteria-policies", + UsageSecretName: "", + }, + client: &fakeKubeClient{}, + longhorn: &fakeLonghornClient{}, + metrics: newTelemetry(), + ui: newUIRenderer(), + policies: map[string]api.BackupPolicy{}, + jobUsage: map[string]resticJobUsageCacheEntry{}, + usageStore: map[string]resticPersistedUsageEntry{}, + } + srv.handler = http.HandlerFunc(srv.route) + + ctx, cancel := context.WithCancel(context.Background()) + srv.Start(ctx) + cancel() + + b2Usage := srv.getB2Usage() + if b2Usage.Enabled { + t.Fatalf("expected B2 usage to remain disabled, got %#v", b2Usage) + } +} + func TestProtectedInventoryRequiresAuth(t *testing.T) { srv := &Server{ cfg: &config.Config{AuthRequired: true, AllowedGroups: []string{"admin", "maintenance"}, BackupDriver: "longhorn"}, diff --git a/internal/server/server_utilities.go b/internal/server/server_utilities.go new file mode 100644 index 0000000..d5dcbb4 --- /dev/null +++ b/internal/server/server_utilities.go @@ -0,0 +1,336 @@ +package server + +import ( + "context" + "encoding/base64" + "encoding/json" + "fmt" + "math" + "net/http" + "sort" + "strconv" + "strings" + "time" + + "scm.bstein.dev/bstein/soteria/internal/api" + "scm.bstein.dev/bstein/soteria/internal/k8s" + "scm.bstein.dev/bstein/soteria/internal/longhorn" + + "k8s.io/apimachinery/pkg/api/resource" + k8svalidation "k8s.io/apimachinery/pkg/util/validation" +) + +func validateKubernetesName(field, value string) error { + if errs := k8svalidation.IsDNS1123Label(value); len(errs) > 0 { + return fmt.Errorf("%s must be a valid Kubernetes DNS-1123 label", field) + } + return nil +} + +func buildBackupRecords(backups []longhorn.Backup) []api.BackupRecord { + records := make([]api.BackupRecord, 0, len(backups)) + latestName := "" + if latest, _, ok := latestCompletedBackup(backups); ok { + latestName = latest.Name + } + + sort.Slice(backups, func(i, j int) bool { + left, lok := parseBackupTime(backups[i].Created) + right, rok := parseBackupTime(backups[j].Created) + switch { + case lok && rok: + return left.After(right) + case lok: + return true + case rok: + return false + default: + return backups[i].Name > backups[j].Name + } + }) + + for _, backup := range backups { + records = append(records, api.BackupRecord{ + Name: backup.Name, + SnapshotName: backup.SnapshotName, + Created: backup.Created, + State: backup.State, + URL: backup.URL, + Size: backup.Size, + Latest: backup.Name == latestName, + }) + } + return records +} + +func (s *Server) buildResticBackupRecords(ctx context.Context, namespace string, jobs []k8s.BackupJobSummary, defaultRepository string) []api.BackupRecord { + records := make([]api.BackupRecord, 0, len(jobs)) + latestName := "" + for _, job := range jobs { + if strings.EqualFold(job.State, "Completed") { + latestName = job.Name + break + } + } + + for _, job := range jobs { + created := "" + if ts := backupJobTimestamp(job); !ts.IsZero() { + created = ts.UTC().Format(time.RFC3339) + } + url := "" + size := "" + latest := job.Name == latestName + if latest && strings.EqualFold(job.State, "Completed") { + repository := strings.TrimSpace(job.Repository) + if repository == "" { + repository = strings.TrimSpace(defaultRepository) + } + url = encodeResticSelector(repository) + } + if strings.EqualFold(job.State, "Completed") { + if bytes, ok := s.lookupResticStoredBytesForJob(ctx, namespace, job.Name); ok { + size = formatBytesIEC(bytes) + } + } + records = append(records, api.BackupRecord{ + Name: job.Name, + SnapshotName: job.Name, + Created: created, + State: job.State, + URL: url, + Size: size, + Latest: latest, + }) + } + return records +} + +func encodeResticSelector(repository string) string { + repository = strings.TrimSpace(repository) + if repository == "" { + return "latest" + } + return resticSelectorPrefix + base64.RawURLEncoding.EncodeToString([]byte(repository)) +} + +func decodeResticSelector(raw string) (string, string, bool) { + raw = strings.TrimSpace(raw) + if raw == "" { + return "", "", false + } + if raw == "latest" { + return "", "latest", true + } + if !strings.HasPrefix(raw, resticSelectorPrefix) { + return "", "", false + } + encoded := strings.TrimPrefix(raw, resticSelectorPrefix) + if encoded == "" { + return "", "", false + } + decoded, err := base64.RawURLEncoding.DecodeString(encoded) + if err != nil { + return "", "", false + } + repository := strings.TrimSpace(string(decoded)) + if repository == "" { + return "", "", false + } + return repository, "latest", true +} + +func backupJobTimestamp(job k8s.BackupJobSummary) time.Time { + if !job.CompletionTime.IsZero() { + return job.CompletionTime + } + return job.CreatedAt +} + +func backupJobInProgress(state string) bool { + switch strings.ToLower(strings.TrimSpace(state)) { + case "pending", "running": + return true + default: + return false + } +} + +func backupJobProgressPct(state string) int { + switch strings.ToLower(strings.TrimSpace(state)) { + case "pending": + return 20 + case "running": + return 70 + case "completed", "failed": + return 100 + default: + return 0 + } +} + +func latestCompletedBackup(backups []longhorn.Backup) (longhorn.Backup, time.Time, bool) { + var selected longhorn.Backup + var selectedTime time.Time + found := false + for _, backup := range backups { + if backup.State != "Completed" { + continue + } + createdAt, ok := parseBackupTime(backup.Created) + if !ok { + if !found { + selected = backup + found = true + } + continue + } + if !found || createdAt.After(selectedTime) { + selected = backup + selectedTime = createdAt + found = true + } + } + return selected, selectedTime, found +} + +func parseBackupTime(raw string) (time.Time, bool) { + layouts := []string{time.RFC3339Nano, time.RFC3339} + for _, layout := range layouts { + parsed, err := time.Parse(layout, raw) + if err == nil { + return parsed, true + } + } + return time.Time{}, false +} + +func writeJSON(w http.ResponseWriter, status int, payload any) { + w.Header().Set("Content-Type", "application/json") + w.WriteHeader(status) + _ = json.NewEncoder(w).Encode(payload) +} + +func writeError(w http.ResponseWriter, status int, message string) { + writeJSON(w, status, map[string]string{"error": message}) +} + +func backupName(prefix, value string) string { + base := sanitizeName(fmt.Sprintf("soteria-%s-%s", prefix, value)) + timestamp := time.Now().UTC().Format("20060102-150405") + name := fmt.Sprintf("%s-%s", base, timestamp) + if len(name) <= 63 { + return name + } + maxBase := 63 - len(timestamp) - 1 + if maxBase < 1 { + maxBase = 1 + } + if len(base) > maxBase { + base = base[:maxBase] + } + return fmt.Sprintf("%s-%s", base, timestamp) +} + +func sanitizeName(value string) string { + value = strings.ToLower(value) + value = strings.ReplaceAll(value, "_", "-") + value = strings.ReplaceAll(value, ".", "-") + value = strings.ReplaceAll(value, " ", "-") + value = strings.Trim(value, "-") + return value +} + +func roundHours(value float64) float64 { + return math.Round(value*100) / 100 +} + +func parseSizeBytes(raw string) int64 { + raw = strings.TrimSpace(raw) + if raw == "" { + return 0 + } + if value, err := strconv.ParseInt(raw, 10, 64); err == nil { + return value + } + if value, err := strconv.ParseFloat(raw, 64); err == nil { + if value < 0 { + return 0 + } + return int64(value) + } + if quantity, err := resource.ParseQuantity(raw); err == nil { + return quantity.Value() + } + return 0 +} + +func formatBytesIEC(value float64) string { + if value <= 0 || math.IsNaN(value) || math.IsInf(value, 0) { + return "0 B" + } + units := []string{"B", "KiB", "MiB", "GiB", "TiB"} + size := value + unit := 0 + for size >= 1024 && unit < len(units)-1 { + size /= 1024 + unit++ + } + if unit == 0 { + return fmt.Sprintf("%.0f %s", size, units[unit]) + } + return fmt.Sprintf("%.2f %s", size, units[unit]) +} + +func dedupeDefault(value *bool) bool { + if value == nil { + return true + } + return *value +} + +func boolPtr(value bool) *bool { + ptr := value + return &ptr +} + +func keepLastDefault(value *int) int { + if value == nil { + return 0 + } + if *value < 0 { + return 0 + } + return *value +} + +func intPtr(value int) *int { + ptr := value + return &ptr +} + +func validateKeepLast(value *int) error { + if value == nil { + return nil + } + if *value < 0 { + return fmt.Errorf("keep_last must be >= 0") + } + if *value > maxPolicyKeepLast { + return fmt.Errorf("keep_last must be <= %d", maxPolicyKeepLast) + } + return nil +} + +func keepLastStricter(candidate, current int) bool { + switch { + case candidate > 0 && current == 0: + return true + case candidate == 0: + return false + case current == 0: + return true + default: + return candidate < current + } +} diff --git a/scripts/loc_hygiene_waivers.tsv b/scripts/loc_hygiene_waivers.tsv index 3f19ae8..c4feeeb 100644 --- a/scripts/loc_hygiene_waivers.tsv +++ b/scripts/loc_hygiene_waivers.tsv @@ -1,2 +1 @@ # relative_path max_lines reason -internal/server/server.go 2203 legacy-oversize