package server import ( "context" "encoding/json" "fmt" "log" "math" "net/http" "sort" "strings" "time" "scm.bstein.dev/bstein/soteria/internal/api" "scm.bstein.dev/bstein/soteria/internal/config" "scm.bstein.dev/bstein/soteria/internal/k8s" "scm.bstein.dev/bstein/soteria/internal/longhorn" corev1 "k8s.io/api/core/v1" k8svalidation "k8s.io/apimachinery/pkg/util/validation" ) type kubeClient interface { ResolvePVCVolume(ctx context.Context, namespace, pvcName string) (string, *corev1.PersistentVolumeClaim, *corev1.PersistentVolume, error) CreateBackupJob(ctx context.Context, cfg *config.Config, req api.BackupRequest) (string, string, error) CreateRestoreJob(ctx context.Context, cfg *config.Config, req api.RestoreTestRequest) (string, string, error) ListBoundPVCs(ctx context.Context) ([]k8s.PVCSummary, error) PersistentVolumeClaimExists(ctx context.Context, namespace, pvcName string) (bool, error) } type longhornClient interface { SnapshotBackup(ctx context.Context, volume, name string, labels map[string]string, backupMode string) (*longhorn.Volume, error) GetVolume(ctx context.Context, volume string) (*longhorn.Volume, error) CreateVolumeFromBackup(ctx context.Context, name, size string, replicas int, backupURL string) (*longhorn.Volume, error) CreatePVC(ctx context.Context, volumeName, namespace, pvcName string) error DeleteVolume(ctx context.Context, volumeName string) error FindBackup(ctx context.Context, volumeName, snapshot string) (*longhorn.Backup, error) ListBackups(ctx context.Context, volumeName string) ([]longhorn.Backup, error) } type Server struct { cfg *config.Config client kubeClient longhorn longhornClient metrics *telemetry handler http.Handler } type authIdentity struct { Authenticated bool User string Email string Groups []string } type ctxKey string const authContextKey ctxKey = "soteria-auth" func New(cfg *config.Config, client *k8s.Client, lh *longhorn.Client) *Server { s := &Server{ cfg: cfg, client: client, longhorn: lh, metrics: newTelemetry(), } s.handler = http.HandlerFunc(s.route) return s } func (s *Server) Start(ctx context.Context) { s.refreshTelemetry(ctx) ticker := time.NewTicker(s.cfg.MetricsRefreshInterval) go func() { defer ticker.Stop() for { select { case <-ctx.Done(): return case <-ticker.C: s.refreshTelemetry(ctx) } } }() } func (s *Server) Handler() http.Handler { return s.handler } func (s *Server) route(w http.ResponseWriter, r *http.Request) { switch r.URL.Path { case "/healthz": s.handleHealth(w, r) return case "/readyz": s.handleReady(w, r) return case "/metrics": s.metrics.Handler().ServeHTTP(w, r) return } identity, status, err := s.authorize(r) if err != nil { s.metrics.RecordAuthzDenied(authzReason(status, err)) writeError(w, status, err.Error()) return } r = r.WithContext(context.WithValue(r.Context(), authContextKey, identity)) switch r.URL.Path { case "/": s.handleUI(w, r) case "/v1/whoami": s.handleWhoAmI(w, r) case "/v1/inventory": s.handleInventory(w, r) case "/v1/backups": s.handleBackups(w, r) case "/v1/backup": s.handleBackup(w, r) case "/v1/restores", "/v1/restore-test": s.handleRestore(w, r) default: writeError(w, http.StatusNotFound, "not found") } } func (s *Server) handleHealth(w http.ResponseWriter, _ *http.Request) { writeJSON(w, http.StatusOK, map[string]string{"status": "ok"}) } func (s *Server) handleReady(w http.ResponseWriter, _ *http.Request) { writeJSON(w, http.StatusOK, map[string]string{"status": "ready"}) } func (s *Server) handleUI(w http.ResponseWriter, r *http.Request) { if r.Method != http.MethodGet { writeError(w, http.StatusMethodNotAllowed, "method not allowed") return } w.Header().Set("Content-Type", "text/html; charset=utf-8") _, _ = w.Write([]byte(uiHTML)) } func (s *Server) handleWhoAmI(w http.ResponseWriter, r *http.Request) { if r.Method != http.MethodGet { writeError(w, http.StatusMethodNotAllowed, "method not allowed") return } identity := requesterFromContext(r.Context()) writeJSON(w, http.StatusOK, api.AuthInfoResponse{ Authenticated: identity.Authenticated, User: identity.User, Email: identity.Email, Groups: identity.Groups, }) } func (s *Server) handleInventory(w http.ResponseWriter, r *http.Request) { if r.Method != http.MethodGet { writeError(w, http.StatusMethodNotAllowed, "method not allowed") return } inventory, err := s.buildInventory(r.Context()) if err != nil { writeError(w, http.StatusBadGateway, err.Error()) return } writeJSON(w, http.StatusOK, inventory) } func (s *Server) handleBackups(w http.ResponseWriter, r *http.Request) { if r.Method != http.MethodGet { writeError(w, http.StatusMethodNotAllowed, "method not allowed") return } namespace := strings.TrimSpace(r.URL.Query().Get("namespace")) pvcName := strings.TrimSpace(r.URL.Query().Get("pvc")) if namespace == "" || pvcName == "" { writeError(w, http.StatusBadRequest, "namespace and pvc are required") return } volumeName, _, _, err := s.client.ResolvePVCVolume(r.Context(), namespace, pvcName) if err != nil { writeError(w, http.StatusBadRequest, err.Error()) return } backups, err := s.longhorn.ListBackups(r.Context(), volumeName) if err != nil { writeError(w, http.StatusBadGateway, err.Error()) return } writeJSON(w, http.StatusOK, api.BackupListResponse{ Namespace: namespace, PVC: pvcName, Volume: volumeName, Backups: buildBackupRecords(backups), }) } func (s *Server) handleBackup(w http.ResponseWriter, r *http.Request) { if r.Method != http.MethodPost { writeError(w, http.StatusMethodNotAllowed, "method not allowed") return } r.Body = http.MaxBytesReader(w, r.Body, 1<<20) var req api.BackupRequest if err := json.NewDecoder(r.Body).Decode(&req); err != nil { s.metrics.RecordBackupRequest(s.cfg.BackupDriver, "invalid_json") writeError(w, http.StatusBadRequest, fmt.Sprintf("invalid JSON: %v", err)) return } if strings.TrimSpace(req.Namespace) == "" || strings.TrimSpace(req.PVC) == "" { s.metrics.RecordBackupRequest(s.cfg.BackupDriver, "validation_error") writeError(w, http.StatusBadRequest, "namespace and pvc are required") return } requester := currentRequester(r.Context()) switch s.cfg.BackupDriver { case "longhorn": volumeName, _, _, err := s.client.ResolvePVCVolume(r.Context(), req.Namespace, req.PVC) if err != nil { s.metrics.RecordBackupRequest("longhorn", "validation_error") writeError(w, http.StatusBadRequest, err.Error()) return } backupName := backupName("backup", req.Namespace+"-"+req.PVC) if req.DryRun { s.metrics.RecordBackupRequest("longhorn", "dry_run") writeJSON(w, http.StatusOK, api.BackupResponse{ Driver: "longhorn", Volume: volumeName, Backup: backupName, Namespace: req.Namespace, RequestedBy: requester, DryRun: true, }) return } labels := map[string]string{ "soteria.bstein.dev/namespace": req.Namespace, "soteria.bstein.dev/pvc": req.PVC, "soteria.bstein.dev/requested-by": requester, } if _, err := s.longhorn.SnapshotBackup(r.Context(), volumeName, backupName, labels, s.cfg.LonghornBackupMode); err != nil { s.metrics.RecordBackupRequest("longhorn", "backend_error") writeError(w, http.StatusBadGateway, err.Error()) return } s.metrics.RecordBackupRequest("longhorn", "success") writeJSON(w, http.StatusOK, api.BackupResponse{ Driver: "longhorn", Volume: volumeName, Backup: backupName, Namespace: req.Namespace, RequestedBy: requester, DryRun: false, }) case "restic": jobName, secretName, err := s.client.CreateBackupJob(r.Context(), s.cfg, req) if err != nil { s.metrics.RecordBackupRequest("restic", "backend_error") writeError(w, http.StatusBadRequest, err.Error()) return } if req.DryRun { s.metrics.RecordBackupRequest("restic", "dry_run") } else { s.metrics.RecordBackupRequest("restic", "success") } writeJSON(w, http.StatusOK, api.BackupResponse{ Driver: "restic", JobName: jobName, Namespace: req.Namespace, Secret: secretName, RequestedBy: requester, DryRun: req.DryRun, }) default: s.metrics.RecordBackupRequest(s.cfg.BackupDriver, "unsupported_driver") writeError(w, http.StatusBadRequest, "unsupported backup driver") } } func (s *Server) handleRestore(w http.ResponseWriter, r *http.Request) { if r.Method != http.MethodPost { writeError(w, http.StatusMethodNotAllowed, "method not allowed") return } r.Body = http.MaxBytesReader(w, r.Body, 1<<20) var req api.RestoreTestRequest if err := json.NewDecoder(r.Body).Decode(&req); err != nil { s.metrics.RecordRestoreRequest(s.cfg.BackupDriver, "invalid_json") writeError(w, http.StatusBadRequest, fmt.Sprintf("invalid JSON: %v", err)) return } req.Namespace = strings.TrimSpace(req.Namespace) req.PVC = strings.TrimSpace(req.PVC) req.TargetPVC = strings.TrimSpace(req.TargetPVC) req.TargetNamespace = strings.TrimSpace(req.TargetNamespace) if req.Namespace == "" { s.metrics.RecordRestoreRequest(s.cfg.BackupDriver, "validation_error") writeError(w, http.StatusBadRequest, "namespace is required") return } if req.PVC == "" { s.metrics.RecordRestoreRequest(s.cfg.BackupDriver, "validation_error") writeError(w, http.StatusBadRequest, "pvc is required") return } if req.TargetPVC == "" { s.metrics.RecordRestoreRequest(s.cfg.BackupDriver, "validation_error") writeError(w, http.StatusBadRequest, "target_pvc is required") return } if req.TargetNamespace == "" { req.TargetNamespace = req.Namespace } if err := validateKubernetesName("namespace", req.Namespace); err != nil { s.metrics.RecordRestoreRequest(s.cfg.BackupDriver, "validation_error") writeError(w, http.StatusBadRequest, err.Error()) return } if err := validateKubernetesName("pvc", req.PVC); err != nil { s.metrics.RecordRestoreRequest(s.cfg.BackupDriver, "validation_error") writeError(w, http.StatusBadRequest, err.Error()) return } if err := validateKubernetesName("target_namespace", req.TargetNamespace); err != nil { s.metrics.RecordRestoreRequest(s.cfg.BackupDriver, "validation_error") writeError(w, http.StatusBadRequest, err.Error()) return } if err := validateKubernetesName("target_pvc", req.TargetPVC); err != nil { s.metrics.RecordRestoreRequest(s.cfg.BackupDriver, "validation_error") writeError(w, http.StatusBadRequest, err.Error()) return } if req.Namespace == req.TargetNamespace && req.PVC == req.TargetPVC { s.metrics.RecordRestoreRequest(s.cfg.BackupDriver, "conflict") writeError(w, http.StatusConflict, "target namespace/pvc must differ from source") return } requester := currentRequester(r.Context()) switch s.cfg.BackupDriver { case "longhorn": exists, err := s.client.PersistentVolumeClaimExists(r.Context(), req.TargetNamespace, req.TargetPVC) if err != nil { s.metrics.RecordRestoreRequest("longhorn", "validation_error") writeError(w, http.StatusBadRequest, err.Error()) return } if exists { s.metrics.RecordRestoreRequest("longhorn", "conflict") writeError(w, http.StatusConflict, fmt.Sprintf("target pvc %s/%s already exists", req.TargetNamespace, req.TargetPVC)) return } volumeName, _, _, err := s.client.ResolvePVCVolume(r.Context(), req.Namespace, req.PVC) if err != nil { s.metrics.RecordRestoreRequest("longhorn", "validation_error") writeError(w, http.StatusBadRequest, err.Error()) return } backupURL := strings.TrimSpace(req.BackupURL) if backupURL == "" { backup, err := s.longhorn.FindBackup(r.Context(), volumeName, req.Snapshot) if err != nil { s.metrics.RecordRestoreRequest("longhorn", "validation_error") writeError(w, http.StatusBadRequest, err.Error()) return } backupURL = backup.URL } if backupURL == "" { s.metrics.RecordRestoreRequest("longhorn", "validation_error") writeError(w, http.StatusBadRequest, "backup_url is required") return } restoreVolumeName := backupName("restore", req.TargetNamespace+"-"+req.TargetPVC) if req.DryRun { s.metrics.RecordRestoreRequest("longhorn", "dry_run") writeJSON(w, http.StatusOK, api.RestoreTestResponse{ Driver: "longhorn", Volume: restoreVolumeName, TargetNamespace: req.TargetNamespace, TargetPVC: req.TargetPVC, BackupURL: backupURL, Namespace: req.Namespace, RequestedBy: requester, DryRun: true, }) return } sourceVolume, err := s.longhorn.GetVolume(r.Context(), volumeName) if err != nil { s.metrics.RecordRestoreRequest("longhorn", "backend_error") writeError(w, http.StatusBadGateway, err.Error()) return } replicas := sourceVolume.NumberOfReplicas if replicas == 0 { replicas = 2 } if _, err := s.longhorn.CreateVolumeFromBackup(r.Context(), restoreVolumeName, sourceVolume.Size, replicas, backupURL); err != nil { s.metrics.RecordRestoreRequest("longhorn", "backend_error") writeError(w, http.StatusBadGateway, err.Error()) return } if err := s.longhorn.CreatePVC(r.Context(), restoreVolumeName, req.TargetNamespace, req.TargetPVC); err != nil { cleanupErr := s.longhorn.DeleteVolume(r.Context(), restoreVolumeName) if cleanupErr != nil { log.Printf("restore cleanup failed for %s: %v", restoreVolumeName, cleanupErr) writeError(w, http.StatusBadGateway, fmt.Sprintf("create restore pvc: %v (cleanup failed: %v)", err, cleanupErr)) } else { writeError(w, http.StatusBadGateway, fmt.Sprintf("create restore pvc: %v", err)) } s.metrics.RecordRestoreRequest("longhorn", "backend_error") return } s.metrics.RecordRestoreRequest("longhorn", "success") writeJSON(w, http.StatusOK, api.RestoreTestResponse{ Driver: "longhorn", Volume: restoreVolumeName, TargetNamespace: req.TargetNamespace, TargetPVC: req.TargetPVC, BackupURL: backupURL, Namespace: req.Namespace, RequestedBy: requester, DryRun: false, }) case "restic": jobName, secretName, err := s.client.CreateRestoreJob(r.Context(), s.cfg, req) if err != nil { s.metrics.RecordRestoreRequest("restic", "backend_error") writeError(w, http.StatusBadRequest, err.Error()) return } if req.DryRun { s.metrics.RecordRestoreRequest("restic", "dry_run") } else { s.metrics.RecordRestoreRequest("restic", "success") } writeJSON(w, http.StatusOK, api.RestoreTestResponse{ Driver: "restic", JobName: jobName, Namespace: req.Namespace, TargetNamespace: req.TargetNamespace, TargetPVC: req.TargetPVC, Secret: secretName, RequestedBy: requester, DryRun: req.DryRun, }) default: s.metrics.RecordRestoreRequest(s.cfg.BackupDriver, "unsupported_driver") writeError(w, http.StatusBadRequest, "unsupported backup driver") } } func (s *Server) buildInventory(ctx context.Context) (api.InventoryResponse, error) { pvcs, err := s.client.ListBoundPVCs(ctx) if err != nil { return api.InventoryResponse{}, err } groups := make(map[string][]api.PVCInventory) for _, summary := range pvcs { entry := api.PVCInventory{ Namespace: summary.Namespace, PVC: summary.Name, Volume: summary.VolumeName, Phase: summary.Phase, StorageClass: summary.StorageClass, Capacity: summary.Capacity, AccessModes: summary.AccessModes, Driver: s.cfg.BackupDriver, } s.enrichPVCInventory(ctx, &entry) groups[summary.Namespace] = append(groups[summary.Namespace], entry) } namespaceNames := make([]string, 0, len(groups)) for namespace := range groups { namespaceNames = append(namespaceNames, namespace) } sort.Strings(namespaceNames) response := api.InventoryResponse{ GeneratedAt: time.Now().UTC().Format(time.RFC3339), Namespaces: make([]api.NamespaceInventory, 0, len(namespaceNames)), } for _, namespace := range namespaceNames { response.Namespaces = append(response.Namespaces, api.NamespaceInventory{ Name: namespace, PVCs: groups[namespace], }) } return response, nil } func (s *Server) enrichPVCInventory(ctx context.Context, entry *api.PVCInventory) { switch s.cfg.BackupDriver { case "longhorn": backups, err := s.longhorn.ListBackups(ctx, entry.Volume) if err != nil { entry.Healthy = false entry.HealthReason = "lookup_failed" entry.Error = err.Error() return } entry.BackupCount = len(backups) latest, latestTime, ok := latestCompletedBackup(backups) if !ok { entry.Healthy = false if len(backups) == 0 { entry.HealthReason = "missing" } else { entry.HealthReason = "no_completed" } return } entry.LastBackupAt = latest.Created if latestTime.IsZero() { entry.Healthy = false entry.HealthReason = "unknown_timestamp" return } entry.LastBackupAgeHours = roundHours(time.Since(latestTime).Hours()) entry.Healthy = time.Since(latestTime) <= s.cfg.BackupMaxAge if entry.Healthy { entry.HealthReason = "fresh" } else { entry.HealthReason = "stale" } case "restic": entry.Healthy = false entry.HealthReason = "inventory_unavailable" entry.Error = "restic inventory telemetry is not implemented yet" default: entry.Healthy = false entry.HealthReason = "unsupported_driver" } } func (s *Server) refreshTelemetry(ctx context.Context) { refreshCtx, cancel := context.WithTimeout(ctx, 2*time.Minute) defer cancel() inventory, err := s.buildInventory(refreshCtx) if err != nil { log.Printf("inventory refresh failed: %v", err) s.metrics.RecordInventoryFailure() return } s.metrics.RecordInventory(inventory) } func (s *Server) authorize(r *http.Request) (authIdentity, int, error) { if !s.cfg.AuthRequired { return authIdentity{}, http.StatusOK, nil } authorization := strings.TrimSpace(r.Header.Get("Authorization")) if strings.HasPrefix(strings.ToLower(authorization), "bearer ") { token := strings.TrimSpace(authorization[7:]) for _, expected := range s.cfg.AuthBearerTokens { if token != "" && token == expected { return authIdentity{Authenticated: true, User: "service-token", Groups: []string{"service-token"}}, http.StatusOK, nil } } } identity := authIdentity{ Authenticated: true, User: firstHeader(r, "X-Auth-Request-User", "X-Forwarded-User"), Email: firstHeader(r, "X-Auth-Request-Email", "X-Forwarded-Email"), Groups: normalizeGroups(splitGroups(firstHeader(r, "X-Auth-Request-Groups", "X-Forwarded-Groups"))), } if identity.User == "" && identity.Email == "" { return authIdentity{}, http.StatusUnauthorized, fmt.Errorf("authentication required") } if len(s.cfg.AllowedGroups) == 0 { return identity, http.StatusOK, nil } if hasAllowedGroup(identity.Groups, s.cfg.AllowedGroups) { return identity, http.StatusOK, nil } return authIdentity{}, http.StatusForbidden, fmt.Errorf("access requires one of: %s", strings.Join(s.cfg.AllowedGroups, ", ")) } func requesterFromContext(ctx context.Context) authIdentity { identity, _ := ctx.Value(authContextKey).(authIdentity) return identity } func currentRequester(ctx context.Context) string { identity := requesterFromContext(ctx) if identity.User != "" { return identity.User } if identity.Email != "" { return identity.Email } if identity.Authenticated { return "authenticated" } return "anonymous" } func authzReason(status int, err error) string { if err == nil { return "unknown" } switch status { case http.StatusUnauthorized: return "unauthenticated" case http.StatusForbidden: return "forbidden_group" default: return "error" } } func hasAllowedGroup(actual, allowed []string) bool { allowedSet := make(map[string]struct{}, len(allowed)) for _, group := range normalizeGroups(allowed) { allowedSet[group] = struct{}{} } for _, group := range normalizeGroups(actual) { if _, ok := allowedSet[group]; ok { return true } } return false } func normalizeGroups(values []string) []string { groups := make([]string, 0, len(values)) for _, value := range values { value = strings.TrimSpace(strings.TrimPrefix(value, "/")) if value == "" { continue } groups = append(groups, value) } return groups } func firstHeader(r *http.Request, names ...string) string { for _, name := range names { value := strings.TrimSpace(r.Header.Get(name)) if value != "" { return value } } return "" } func splitGroups(raw string) []string { raw = strings.TrimSpace(raw) if raw == "" { return nil } return strings.FieldsFunc(raw, func(r rune) bool { return r == ',' || r == ';' }) } func validateKubernetesName(field, value string) error { if errs := k8svalidation.IsDNS1123Label(value); len(errs) > 0 { return fmt.Errorf("%s must be a valid Kubernetes DNS-1123 label", field) } return nil } func buildBackupRecords(backups []longhorn.Backup) []api.BackupRecord { records := make([]api.BackupRecord, 0, len(backups)) latestName := "" if latest, _, ok := latestCompletedBackup(backups); ok { latestName = latest.Name } sort.Slice(backups, func(i, j int) bool { left, lok := parseBackupTime(backups[i].Created) right, rok := parseBackupTime(backups[j].Created) switch { case lok && rok: return left.After(right) case lok: return true case rok: return false default: return backups[i].Name > backups[j].Name } }) for _, backup := range backups { records = append(records, api.BackupRecord{ Name: backup.Name, SnapshotName: backup.SnapshotName, Created: backup.Created, State: backup.State, URL: backup.URL, Size: backup.Size, Latest: backup.Name == latestName, }) } return records } func latestCompletedBackup(backups []longhorn.Backup) (longhorn.Backup, time.Time, bool) { var selected longhorn.Backup var selectedTime time.Time found := false for _, backup := range backups { if backup.State != "Completed" { continue } createdAt, ok := parseBackupTime(backup.Created) if !ok { if !found { selected = backup found = true } continue } if !found || createdAt.After(selectedTime) { selected = backup selectedTime = createdAt found = true } } return selected, selectedTime, found } func parseBackupTime(raw string) (time.Time, bool) { layouts := []string{time.RFC3339Nano, time.RFC3339} for _, layout := range layouts { parsed, err := time.Parse(layout, raw) if err == nil { return parsed, true } } return time.Time{}, false } func writeJSON(w http.ResponseWriter, status int, payload any) { w.Header().Set("Content-Type", "application/json") w.WriteHeader(status) _ = json.NewEncoder(w).Encode(payload) } func writeError(w http.ResponseWriter, status int, message string) { writeJSON(w, status, map[string]string{"error": message}) } func backupName(prefix, value string) string { base := sanitizeName(fmt.Sprintf("soteria-%s-%s", prefix, value)) timestamp := time.Now().UTC().Format("20060102-150405") name := fmt.Sprintf("%s-%s", base, timestamp) if len(name) <= 63 { return name } maxBase := 63 - len(timestamp) - 1 if maxBase < 1 { maxBase = 1 } if len(base) > maxBase { base = base[:maxBase] } return fmt.Sprintf("%s-%s", base, timestamp) } func sanitizeName(value string) string { value = strings.ToLower(value) value = strings.ReplaceAll(value, "_", "-") value = strings.ReplaceAll(value, ".", "-") value = strings.ReplaceAll(value, " ", "-") value = strings.Trim(value, "-") return value } func roundHours(value float64) float64 { return math.Round(value*100) / 100 }