soteria/internal/server/inventory_builder.go

267 lines
6.9 KiB
Go
Raw Permalink Normal View History

package server
import (
"context"
"log"
"sort"
"strings"
"time"
"scm.bstein.dev/bstein/soteria/internal/api"
"scm.bstein.dev/bstein/soteria/internal/k8s"
)
func (s *Server) buildInventory(ctx context.Context) (api.InventoryResponse, error) {
pvcs, err := s.client.ListBoundPVCs(ctx)
if err != nil {
return api.InventoryResponse{}, err
}
resticJobsByPVC, resticLookupErrors := s.prefetchResticBackupJobs(ctx, pvcs)
groups := make(map[string][]api.PVCInventory)
for _, summary := range pvcs {
entry := api.PVCInventory{
Namespace: summary.Namespace,
PVC: summary.Name,
Volume: summary.VolumeName,
Phase: summary.Phase,
StorageClass: summary.StorageClass,
Capacity: summary.Capacity,
AccessModes: summary.AccessModes,
Driver: s.cfg.BackupDriver,
}
s.enrichPVCInventory(ctx, &entry, resticJobsByPVC, resticLookupErrors)
groups[summary.Namespace] = append(groups[summary.Namespace], entry)
}
namespaceNames := make([]string, 0, len(groups))
for namespace := range groups {
namespaceNames = append(namespaceNames, namespace)
}
sort.Strings(namespaceNames)
response := api.InventoryResponse{
GeneratedAt: time.Now().UTC().Format(time.RFC3339),
Namespaces: make([]api.NamespaceInventory, 0, len(namespaceNames)),
}
for _, namespace := range namespaceNames {
response.Namespaces = append(response.Namespaces, api.NamespaceInventory{
Name: namespace,
PVCs: groups[namespace],
})
}
return response, nil
}
func (s *Server) prefetchResticBackupJobs(ctx context.Context, pvcs []k8s.PVCSummary) (map[string][]k8s.BackupJobSummary, map[string]error) {
if s.cfg.BackupDriver != "restic" {
return nil, nil
}
namespaces := map[string]struct{}{}
for _, pvc := range pvcs {
namespaces[pvc.Namespace] = struct{}{}
}
namespaceNames := make([]string, 0, len(namespaces))
for namespace := range namespaces {
namespaceNames = append(namespaceNames, namespace)
}
sort.Strings(namespaceNames)
jobsByPVC := map[string][]k8s.BackupJobSummary{}
lookupErrors := map[string]error{}
for _, namespace := range namespaceNames {
jobs, err := s.client.ListBackupJobs(ctx, namespace)
if err != nil {
lookupErrors[namespace] = err
continue
}
for _, job := range jobs {
key := job.Namespace + "/" + job.PVC
jobsByPVC[key] = append(jobsByPVC[key], job)
}
}
for key := range jobsByPVC {
sortBackupJobsNewestFirst(jobsByPVC[key])
}
return jobsByPVC, lookupErrors
}
func (s *Server) enrichPVCInventory(
ctx context.Context,
entry *api.PVCInventory,
resticJobsByPVC map[string][]k8s.BackupJobSummary,
resticLookupErrors map[string]error,
) {
switch s.cfg.BackupDriver {
case "longhorn":
backups, err := s.longhorn.ListBackups(ctx, entry.Volume)
if err != nil {
entry.Healthy = false
entry.HealthReason = "lookup_failed"
entry.Error = err.Error()
return
}
entry.BackupCount = len(backups)
totalBackupSize := int64(0)
completedBackups := 0
for _, backup := range backups {
totalBackupSize += parseSizeBytes(backup.Size)
if strings.EqualFold(backup.State, "Completed") {
completedBackups++
}
}
entry.CompletedBackups = completedBackups
entry.TotalBackupSizeBytes = float64(totalBackupSize)
latest, latestTime, ok := latestCompletedBackup(backups)
if !ok {
entry.Healthy = false
if len(backups) == 0 {
entry.HealthReason = "missing"
} else {
entry.HealthReason = "no_completed"
}
return
}
entry.LastBackupAt = latest.Created
entry.LastBackupSizeBytes = float64(parseSizeBytes(latest.Size))
if latestTime.IsZero() {
entry.Healthy = false
entry.HealthReason = "unknown_timestamp"
return
}
entry.LastBackupAgeHours = roundHours(time.Since(latestTime).Hours())
entry.Healthy = time.Since(latestTime) <= s.cfg.BackupMaxAge
if entry.Healthy {
entry.HealthReason = "fresh"
} else {
entry.HealthReason = "stale"
}
case "restic":
if err, hasErr := resticLookupErrors[entry.Namespace]; hasErr {
entry.Healthy = false
entry.HealthReason = "lookup_failed"
entry.Error = err.Error()
return
}
key := entry.Namespace + "/" + entry.PVC
jobs := resticJobsByPVC[key]
if jobs == nil {
jobs = []k8s.BackupJobSummary{}
}
entry.BackupCount = len(jobs)
if len(jobs) > 0 {
entry.LastJobName = jobs[0].Name
entry.LastJobState = jobs[0].State
entry.LastJobProgressPct = backupJobProgressPct(jobs[0].State)
if !jobs[0].CreatedAt.IsZero() {
entry.LastJobStartedAt = jobs[0].CreatedAt.UTC().Format(time.RFC3339)
}
}
completed := make([]k8s.BackupJobSummary, 0, len(jobs))
active := 0
for _, job := range jobs {
if backupJobInProgress(job.State) {
active++
}
if strings.EqualFold(job.State, "Completed") {
completed = append(completed, job)
}
}
entry.ActiveBackups = active
entry.CompletedBackups = len(completed)
sizeSamples := completed
if len(sizeSamples) > 0 {
retained := sizeSamples[0].KeepLast
if retained > 0 && retained < len(sizeSamples) {
sizeSamples = sizeSamples[:retained]
}
}
totalStoredBytes := 0.0
storedSamples := 0
for index, job := range sizeSamples {
if index >= maxUsageSampleJobs {
break
}
storedBytes, ok := s.lookupResticStoredBytesForJob(ctx, entry.Namespace, job.Name)
if !ok {
continue
}
if index == 0 {
entry.LastBackupSizeBytes = storedBytes
}
totalStoredBytes += storedBytes
storedSamples++
}
if storedSamples > 0 {
entry.TotalBackupSizeBytes = totalStoredBytes
}
if len(completed) == 0 {
entry.Healthy = false
switch {
case active > 0:
entry.HealthReason = "in_progress"
case len(jobs) == 0:
entry.HealthReason = "missing"
default:
entry.HealthReason = "no_completed"
}
return
}
latestTime := backupJobTimestamp(completed[0])
if latestTime.IsZero() {
entry.Healthy = false
entry.HealthReason = "unknown_timestamp"
return
}
entry.LastBackupAt = latestTime.UTC().Format(time.RFC3339)
entry.LastBackupAgeHours = roundHours(time.Since(latestTime).Hours())
entry.Healthy = time.Since(latestTime) <= s.cfg.BackupMaxAge
if entry.Healthy {
entry.HealthReason = "fresh"
} else {
entry.HealthReason = "stale"
}
default:
entry.Healthy = false
entry.HealthReason = "unsupported_driver"
}
}
func sortBackupJobsNewestFirst(items []k8s.BackupJobSummary) {
sort.Slice(items, func(i, j int) bool {
left := items[i].CompletionTime
if left.IsZero() {
left = items[i].CreatedAt
}
right := items[j].CompletionTime
if right.IsZero() {
right = items[j].CreatedAt
}
if left.Equal(right) {
return items[i].Name > items[j].Name
}
return left.After(right)
})
}
func (s *Server) refreshTelemetry(ctx context.Context) {
refreshCtx, cancel := context.WithTimeout(ctx, 2*time.Minute)
defer cancel()
inventory, err := s.buildInventory(refreshCtx)
if err != nil {
log.Printf("inventory refresh failed: %v", err)
s.metrics.RecordInventoryFailure()
return
}
s.metrics.RecordInventory(inventory)
}