267 lines
6.9 KiB
Go
267 lines
6.9 KiB
Go
|
|
package server
|
||
|
|
|
||
|
|
import (
|
||
|
|
"context"
|
||
|
|
"log"
|
||
|
|
"sort"
|
||
|
|
"strings"
|
||
|
|
"time"
|
||
|
|
|
||
|
|
"scm.bstein.dev/bstein/soteria/internal/api"
|
||
|
|
"scm.bstein.dev/bstein/soteria/internal/k8s"
|
||
|
|
)
|
||
|
|
|
||
|
|
func (s *Server) buildInventory(ctx context.Context) (api.InventoryResponse, error) {
|
||
|
|
pvcs, err := s.client.ListBoundPVCs(ctx)
|
||
|
|
if err != nil {
|
||
|
|
return api.InventoryResponse{}, err
|
||
|
|
}
|
||
|
|
|
||
|
|
resticJobsByPVC, resticLookupErrors := s.prefetchResticBackupJobs(ctx, pvcs)
|
||
|
|
|
||
|
|
groups := make(map[string][]api.PVCInventory)
|
||
|
|
for _, summary := range pvcs {
|
||
|
|
entry := api.PVCInventory{
|
||
|
|
Namespace: summary.Namespace,
|
||
|
|
PVC: summary.Name,
|
||
|
|
Volume: summary.VolumeName,
|
||
|
|
Phase: summary.Phase,
|
||
|
|
StorageClass: summary.StorageClass,
|
||
|
|
Capacity: summary.Capacity,
|
||
|
|
AccessModes: summary.AccessModes,
|
||
|
|
Driver: s.cfg.BackupDriver,
|
||
|
|
}
|
||
|
|
s.enrichPVCInventory(ctx, &entry, resticJobsByPVC, resticLookupErrors)
|
||
|
|
groups[summary.Namespace] = append(groups[summary.Namespace], entry)
|
||
|
|
}
|
||
|
|
|
||
|
|
namespaceNames := make([]string, 0, len(groups))
|
||
|
|
for namespace := range groups {
|
||
|
|
namespaceNames = append(namespaceNames, namespace)
|
||
|
|
}
|
||
|
|
sort.Strings(namespaceNames)
|
||
|
|
|
||
|
|
response := api.InventoryResponse{
|
||
|
|
GeneratedAt: time.Now().UTC().Format(time.RFC3339),
|
||
|
|
Namespaces: make([]api.NamespaceInventory, 0, len(namespaceNames)),
|
||
|
|
}
|
||
|
|
for _, namespace := range namespaceNames {
|
||
|
|
response.Namespaces = append(response.Namespaces, api.NamespaceInventory{
|
||
|
|
Name: namespace,
|
||
|
|
PVCs: groups[namespace],
|
||
|
|
})
|
||
|
|
}
|
||
|
|
return response, nil
|
||
|
|
}
|
||
|
|
|
||
|
|
func (s *Server) prefetchResticBackupJobs(ctx context.Context, pvcs []k8s.PVCSummary) (map[string][]k8s.BackupJobSummary, map[string]error) {
|
||
|
|
if s.cfg.BackupDriver != "restic" {
|
||
|
|
return nil, nil
|
||
|
|
}
|
||
|
|
|
||
|
|
namespaces := map[string]struct{}{}
|
||
|
|
for _, pvc := range pvcs {
|
||
|
|
namespaces[pvc.Namespace] = struct{}{}
|
||
|
|
}
|
||
|
|
|
||
|
|
namespaceNames := make([]string, 0, len(namespaces))
|
||
|
|
for namespace := range namespaces {
|
||
|
|
namespaceNames = append(namespaceNames, namespace)
|
||
|
|
}
|
||
|
|
sort.Strings(namespaceNames)
|
||
|
|
|
||
|
|
jobsByPVC := map[string][]k8s.BackupJobSummary{}
|
||
|
|
lookupErrors := map[string]error{}
|
||
|
|
for _, namespace := range namespaceNames {
|
||
|
|
jobs, err := s.client.ListBackupJobs(ctx, namespace)
|
||
|
|
if err != nil {
|
||
|
|
lookupErrors[namespace] = err
|
||
|
|
continue
|
||
|
|
}
|
||
|
|
for _, job := range jobs {
|
||
|
|
key := job.Namespace + "/" + job.PVC
|
||
|
|
jobsByPVC[key] = append(jobsByPVC[key], job)
|
||
|
|
}
|
||
|
|
}
|
||
|
|
|
||
|
|
for key := range jobsByPVC {
|
||
|
|
sortBackupJobsNewestFirst(jobsByPVC[key])
|
||
|
|
}
|
||
|
|
|
||
|
|
return jobsByPVC, lookupErrors
|
||
|
|
}
|
||
|
|
|
||
|
|
func (s *Server) enrichPVCInventory(
|
||
|
|
ctx context.Context,
|
||
|
|
entry *api.PVCInventory,
|
||
|
|
resticJobsByPVC map[string][]k8s.BackupJobSummary,
|
||
|
|
resticLookupErrors map[string]error,
|
||
|
|
) {
|
||
|
|
switch s.cfg.BackupDriver {
|
||
|
|
case "longhorn":
|
||
|
|
backups, err := s.longhorn.ListBackups(ctx, entry.Volume)
|
||
|
|
if err != nil {
|
||
|
|
entry.Healthy = false
|
||
|
|
entry.HealthReason = "lookup_failed"
|
||
|
|
entry.Error = err.Error()
|
||
|
|
return
|
||
|
|
}
|
||
|
|
entry.BackupCount = len(backups)
|
||
|
|
totalBackupSize := int64(0)
|
||
|
|
completedBackups := 0
|
||
|
|
for _, backup := range backups {
|
||
|
|
totalBackupSize += parseSizeBytes(backup.Size)
|
||
|
|
if strings.EqualFold(backup.State, "Completed") {
|
||
|
|
completedBackups++
|
||
|
|
}
|
||
|
|
}
|
||
|
|
entry.CompletedBackups = completedBackups
|
||
|
|
entry.TotalBackupSizeBytes = float64(totalBackupSize)
|
||
|
|
latest, latestTime, ok := latestCompletedBackup(backups)
|
||
|
|
if !ok {
|
||
|
|
entry.Healthy = false
|
||
|
|
if len(backups) == 0 {
|
||
|
|
entry.HealthReason = "missing"
|
||
|
|
} else {
|
||
|
|
entry.HealthReason = "no_completed"
|
||
|
|
}
|
||
|
|
return
|
||
|
|
}
|
||
|
|
entry.LastBackupAt = latest.Created
|
||
|
|
entry.LastBackupSizeBytes = float64(parseSizeBytes(latest.Size))
|
||
|
|
if latestTime.IsZero() {
|
||
|
|
entry.Healthy = false
|
||
|
|
entry.HealthReason = "unknown_timestamp"
|
||
|
|
return
|
||
|
|
}
|
||
|
|
entry.LastBackupAgeHours = roundHours(time.Since(latestTime).Hours())
|
||
|
|
entry.Healthy = time.Since(latestTime) <= s.cfg.BackupMaxAge
|
||
|
|
if entry.Healthy {
|
||
|
|
entry.HealthReason = "fresh"
|
||
|
|
} else {
|
||
|
|
entry.HealthReason = "stale"
|
||
|
|
}
|
||
|
|
case "restic":
|
||
|
|
if err, hasErr := resticLookupErrors[entry.Namespace]; hasErr {
|
||
|
|
entry.Healthy = false
|
||
|
|
entry.HealthReason = "lookup_failed"
|
||
|
|
entry.Error = err.Error()
|
||
|
|
return
|
||
|
|
}
|
||
|
|
|
||
|
|
key := entry.Namespace + "/" + entry.PVC
|
||
|
|
jobs := resticJobsByPVC[key]
|
||
|
|
if jobs == nil {
|
||
|
|
jobs = []k8s.BackupJobSummary{}
|
||
|
|
}
|
||
|
|
entry.BackupCount = len(jobs)
|
||
|
|
if len(jobs) > 0 {
|
||
|
|
entry.LastJobName = jobs[0].Name
|
||
|
|
entry.LastJobState = jobs[0].State
|
||
|
|
entry.LastJobProgressPct = backupJobProgressPct(jobs[0].State)
|
||
|
|
if !jobs[0].CreatedAt.IsZero() {
|
||
|
|
entry.LastJobStartedAt = jobs[0].CreatedAt.UTC().Format(time.RFC3339)
|
||
|
|
}
|
||
|
|
}
|
||
|
|
|
||
|
|
completed := make([]k8s.BackupJobSummary, 0, len(jobs))
|
||
|
|
active := 0
|
||
|
|
for _, job := range jobs {
|
||
|
|
if backupJobInProgress(job.State) {
|
||
|
|
active++
|
||
|
|
}
|
||
|
|
if strings.EqualFold(job.State, "Completed") {
|
||
|
|
completed = append(completed, job)
|
||
|
|
}
|
||
|
|
}
|
||
|
|
entry.ActiveBackups = active
|
||
|
|
entry.CompletedBackups = len(completed)
|
||
|
|
sizeSamples := completed
|
||
|
|
if len(sizeSamples) > 0 {
|
||
|
|
retained := sizeSamples[0].KeepLast
|
||
|
|
if retained > 0 && retained < len(sizeSamples) {
|
||
|
|
sizeSamples = sizeSamples[:retained]
|
||
|
|
}
|
||
|
|
}
|
||
|
|
totalStoredBytes := 0.0
|
||
|
|
storedSamples := 0
|
||
|
|
for index, job := range sizeSamples {
|
||
|
|
if index >= maxUsageSampleJobs {
|
||
|
|
break
|
||
|
|
}
|
||
|
|
storedBytes, ok := s.lookupResticStoredBytesForJob(ctx, entry.Namespace, job.Name)
|
||
|
|
if !ok {
|
||
|
|
continue
|
||
|
|
}
|
||
|
|
if index == 0 {
|
||
|
|
entry.LastBackupSizeBytes = storedBytes
|
||
|
|
}
|
||
|
|
totalStoredBytes += storedBytes
|
||
|
|
storedSamples++
|
||
|
|
}
|
||
|
|
if storedSamples > 0 {
|
||
|
|
entry.TotalBackupSizeBytes = totalStoredBytes
|
||
|
|
}
|
||
|
|
if len(completed) == 0 {
|
||
|
|
entry.Healthy = false
|
||
|
|
switch {
|
||
|
|
case active > 0:
|
||
|
|
entry.HealthReason = "in_progress"
|
||
|
|
case len(jobs) == 0:
|
||
|
|
entry.HealthReason = "missing"
|
||
|
|
default:
|
||
|
|
entry.HealthReason = "no_completed"
|
||
|
|
}
|
||
|
|
return
|
||
|
|
}
|
||
|
|
|
||
|
|
latestTime := backupJobTimestamp(completed[0])
|
||
|
|
if latestTime.IsZero() {
|
||
|
|
entry.Healthy = false
|
||
|
|
entry.HealthReason = "unknown_timestamp"
|
||
|
|
return
|
||
|
|
}
|
||
|
|
entry.LastBackupAt = latestTime.UTC().Format(time.RFC3339)
|
||
|
|
entry.LastBackupAgeHours = roundHours(time.Since(latestTime).Hours())
|
||
|
|
entry.Healthy = time.Since(latestTime) <= s.cfg.BackupMaxAge
|
||
|
|
if entry.Healthy {
|
||
|
|
entry.HealthReason = "fresh"
|
||
|
|
} else {
|
||
|
|
entry.HealthReason = "stale"
|
||
|
|
}
|
||
|
|
default:
|
||
|
|
entry.Healthy = false
|
||
|
|
entry.HealthReason = "unsupported_driver"
|
||
|
|
}
|
||
|
|
}
|
||
|
|
|
||
|
|
func sortBackupJobsNewestFirst(items []k8s.BackupJobSummary) {
|
||
|
|
sort.Slice(items, func(i, j int) bool {
|
||
|
|
left := items[i].CompletionTime
|
||
|
|
if left.IsZero() {
|
||
|
|
left = items[i].CreatedAt
|
||
|
|
}
|
||
|
|
right := items[j].CompletionTime
|
||
|
|
if right.IsZero() {
|
||
|
|
right = items[j].CreatedAt
|
||
|
|
}
|
||
|
|
if left.Equal(right) {
|
||
|
|
return items[i].Name > items[j].Name
|
||
|
|
}
|
||
|
|
return left.After(right)
|
||
|
|
})
|
||
|
|
}
|
||
|
|
|
||
|
|
func (s *Server) refreshTelemetry(ctx context.Context) {
|
||
|
|
refreshCtx, cancel := context.WithTimeout(ctx, 2*time.Minute)
|
||
|
|
defer cancel()
|
||
|
|
|
||
|
|
inventory, err := s.buildInventory(refreshCtx)
|
||
|
|
if err != nil {
|
||
|
|
log.Printf("inventory refresh failed: %v", err)
|
||
|
|
s.metrics.RecordInventoryFailure()
|
||
|
|
return
|
||
|
|
}
|
||
|
|
s.metrics.RecordInventory(inventory)
|
||
|
|
}
|