package server import ( "context" "log" "net/http" "strings" "sync" "time" "scm.bstein.dev/bstein/soteria/internal/api" "scm.bstein.dev/bstein/soteria/internal/config" "scm.bstein.dev/bstein/soteria/internal/k8s" "scm.bstein.dev/bstein/soteria/internal/longhorn" corev1 "k8s.io/api/core/v1" ) type kubeClient interface { ResolvePVCVolume(ctx context.Context, namespace, pvcName string) (string, *corev1.PersistentVolumeClaim, *corev1.PersistentVolume, error) CreateBackupJob(ctx context.Context, cfg *config.Config, req api.BackupRequest) (string, string, error) CreateRestoreJob(ctx context.Context, cfg *config.Config, req api.RestoreTestRequest) (string, string, error) ListBackupJobs(ctx context.Context, namespace string) ([]k8s.BackupJobSummary, error) ListBackupJobsForPVC(ctx context.Context, namespace, pvc string) ([]k8s.BackupJobSummary, error) ReadBackupJobLog(ctx context.Context, namespace, jobName string) (string, error) ListBoundPVCs(ctx context.Context) ([]k8s.PVCSummary, error) PersistentVolumeClaimExists(ctx context.Context, namespace, pvcName string) (bool, error) LoadSecretData(ctx context.Context, namespace, secretName, key string) ([]byte, error) SaveSecretData(ctx context.Context, namespace, secretName, key string, value []byte, labels map[string]string) error } type longhornClient interface { CreateSnapshot(ctx context.Context, volume, name string, labels map[string]string) error SnapshotBackup(ctx context.Context, volume, name string, labels map[string]string, backupMode string) (*longhorn.Volume, error) GetVolume(ctx context.Context, volume string) (*longhorn.Volume, error) CreateVolumeFromBackup(ctx context.Context, name, size string, replicas int, backupURL string) (*longhorn.Volume, error) CreatePVC(ctx context.Context, volumeName, namespace, pvcName string) error DeleteVolume(ctx context.Context, volumeName string) error FindBackup(ctx context.Context, volumeName, snapshot string) (*longhorn.Backup, error) ListBackups(ctx context.Context, volumeName string) ([]longhorn.Backup, error) } // Server owns HTTP routing, policy state, telemetry, and the UI renderer. type Server struct { cfg *config.Config client kubeClient longhorn longhornClient metrics *telemetry handler http.Handler ui *uiRenderer policyMu sync.RWMutex policies map[string]api.BackupPolicy runMu sync.Mutex running bool b2Mu sync.RWMutex b2Usage api.B2UsageResponse jobUsage map[string]resticJobUsageCacheEntry jobUsageMu sync.RWMutex usageMu sync.RWMutex usageStore map[string]resticPersistedUsageEntry } type authIdentity struct { Authenticated bool User string Email string Groups []string } type ctxKey string const authContextKey ctxKey = "soteria-auth" const ( policySecretKey = "policies.json" usageSecretKey = "restic-job-usage.json" defaultPolicyHours = 24.0 maxPolicyIntervalHrs = 24 * 365 maxPolicyKeepLast = 1000 maxUsageSampleJobs = 20 resticSelectorPrefix = "restic-latest:" ) type resticJobUsageCacheEntry struct { Known bool Bytes float64 CheckedAt time.Time } type resticPersistedUsageEntry struct { Bytes float64 `json:"bytes"` UpdatedAt string `json:"updated_at,omitempty"` } type resticPersistedUsageDocument struct { Jobs []struct { Key string `json:"key"` Bytes float64 `json:"bytes"` UpdatedAt string `json:"updated_at,omitempty"` } `json:"jobs"` } // New constructs a server with fresh telemetry and in-memory policy state. func New(cfg *config.Config, client *k8s.Client, lh *longhorn.Client) *Server { s := &Server{ cfg: cfg, client: client, longhorn: lh, metrics: newTelemetry(), ui: newUIRenderer(), policies: map[string]api.BackupPolicy{}, jobUsage: map[string]resticJobUsageCacheEntry{}, usageStore: map[string]resticPersistedUsageEntry{}, } s.handler = http.HandlerFunc(s.route) return s } // Start launches telemetry and policy refresh loops for the active server. func (s *Server) Start(ctx context.Context) { if err := s.loadPolicies(ctx); err != nil { log.Printf("policy load failed: %v", err) } if err := s.loadResticUsage(ctx); err != nil { log.Printf("restic usage load failed: %v", err) } s.refreshTelemetry(ctx) s.refreshB2Usage(ctx) s.runPolicyCycle(ctx) metricsTicker := time.NewTicker(s.cfg.MetricsRefreshInterval) policyTicker := time.NewTicker(s.cfg.PolicyEvalInterval) var b2Ticker *time.Ticker var b2Tick <-chan time.Time if s.cfg.B2Enabled { b2Ticker = time.NewTicker(s.cfg.B2ScanInterval) b2Tick = b2Ticker.C } go func() { defer metricsTicker.Stop() defer policyTicker.Stop() if b2Ticker != nil { defer b2Ticker.Stop() } for { select { case <-ctx.Done(): return case <-metricsTicker.C: s.refreshTelemetry(ctx) case <-policyTicker.C: s.runPolicyCycle(ctx) case <-b2Tick: s.refreshB2Usage(ctx) } } }() } // Handler returns the HTTP handler used by the embedded server. func (s *Server) Handler() http.Handler { return s.handler } func (s *Server) route(w http.ResponseWriter, r *http.Request) { switch r.URL.Path { case "/healthz": s.handleHealth(w, r) return case "/readyz": s.handleReady(w, r) return case "/metrics": s.metrics.Handler().ServeHTTP(w, r) return } identity, status, err := s.authorize(r) if err != nil { s.metrics.RecordAuthzDenied(authzReason(status, err)) writeError(w, status, err.Error()) return } r = r.WithContext(context.WithValue(r.Context(), authContextKey, identity)) switch r.URL.Path { case "/": s.handleUI(w, r) case "/v1/b2": s.handleB2Usage(w, r) case "/v1/whoami": s.handleWhoAmI(w, r) case "/v1/inventory": s.handleInventory(w, r) case "/v1/backups": s.handleBackups(w, r) case "/v1/backup": s.handleBackup(w, r) case "/v1/backup/namespace": s.handleNamespaceBackup(w, r) case "/v1/restores", "/v1/restore-test": s.handleRestore(w, r) case "/v1/restores/namespace": s.handleNamespaceRestore(w, r) case "/v1/policies": s.handlePolicies(w, r) default: if s.ui != nil && s.ui.ServeAsset(w, r) { return } if strings.HasPrefix(r.URL.Path, "/v1/policies/") { s.handlePolicyByID(w, r) return } // Serve SPA index for deep links (for example /backup) while preserving // explicit API and asset 404 behavior. if r.Method == http.MethodGet && !strings.HasPrefix(r.URL.Path, "/v1/") && !strings.Contains(r.URL.Path, ".") { s.handleUI(w, r) return } writeError(w, http.StatusNotFound, "not found") } } func (s *Server) handleHealth(w http.ResponseWriter, _ *http.Request) { writeJSON(w, http.StatusOK, map[string]string{"status": "ok"}) } func (s *Server) handleReady(w http.ResponseWriter, _ *http.Request) { writeJSON(w, http.StatusOK, map[string]string{"status": "ready"}) } func (s *Server) handleUI(w http.ResponseWriter, r *http.Request) { if r.Method != http.MethodGet { writeError(w, http.StatusMethodNotAllowed, "method not allowed") return } if s.ui == nil { writeError(w, http.StatusInternalServerError, "UI renderer is unavailable") return } if err := s.ui.ServeIndex(w, r); err != nil { writeError(w, http.StatusInternalServerError, err.Error()) } } func (s *Server) handleWhoAmI(w http.ResponseWriter, r *http.Request) { if r.Method != http.MethodGet { writeError(w, http.StatusMethodNotAllowed, "method not allowed") return } identity := requesterFromContext(r.Context()) writeJSON(w, http.StatusOK, api.AuthInfoResponse{ Authenticated: identity.Authenticated, User: identity.User, Email: identity.Email, Groups: identity.Groups, AllowedGroups: s.cfg.AllowedGroups, }) }