diff --git a/README.md b/README.md index 1ada9d3..a58b39a 100644 --- a/README.md +++ b/README.md @@ -92,8 +92,12 @@ UPS auto-shutdown trigger uses: - runtime threshold = `runtime_safety_factor * estimated_shutdown_budget` - default safety factor `1.25` - debounce across multiple polls to avoid noise +- emergency trigger budget defaults to `shutdown.emergency_budget_seconds` and is learned from historical UPS-triggered shutdown runs once enough samples exist +- UPS-triggered shutdown executes the emergency fast path by default (`shutdown.emergency_skip_drain: true`, `shutdown.emergency_skip_etcd_snapshot: true`) -Estimated shutdown budget is derived from historical successful shutdown runs (`/var/lib/hecate/runs.json`) with default fallback from config. +Estimated shutdown budgets are derived from historical successful shutdown runs (`/var/lib/hecate/runs.json`) with config fallbacks: +- `estimated_shutdown_budget_seconds`: full/manual shutdown path +- `estimated_emergency_shutdown_budget_seconds`: UPS/emergency path Power metrics: - Hecate exposes Prometheus metrics on `:9560/metrics` by default. diff --git a/cmd/hecate/main.go b/cmd/hecate/main.go index 4455413..25f58ec 100644 --- a/cmd/hecate/main.go +++ b/cmd/hecate/main.go @@ -231,6 +231,7 @@ func runStatus(logger *log.Logger, args []string) error { logger.Printf("expected_flux_branch=%s", cfg.ExpectedFluxBranch) logger.Printf("control_planes=%v", cfg.ControlPlanes) logger.Printf("estimated_shutdown_budget_seconds=%d", orch.EstimatedShutdownSeconds()) + logger.Printf("estimated_emergency_shutdown_budget_seconds=%d", orch.EstimatedEmergencyShutdownSeconds()) intent, intentErr := state.ReadIntent(cfg.State.IntentPath) if intentErr != nil { logger.Printf("intent_read_error=%v", intentErr) diff --git a/configs/hecate.example.yaml b/configs/hecate.example.yaml index a626ac7..2a5dba9 100644 --- a/configs/hecate.example.yaml +++ b/configs/hecate.example.yaml @@ -22,11 +22,14 @@ local_bootstrap_paths: - infrastructure/sources/helm - infrastructure/metallb - infrastructure/traefik + - infrastructure/cert-manager - infrastructure/vault-csi - infrastructure/vault-injector - services/vault - infrastructure/postgres - services/gitea + - services/keycloak + - services/oauth2-proxy excluded_namespaces: - kube-system - kube-public @@ -50,6 +53,11 @@ startup: etcd_restore_control_plane: titan-0a shutdown: default_budget_seconds: 1380 + history_min_samples: 3 + emergency_budget_seconds: 420 + emergency_history_min_samples: 3 + emergency_skip_etcd_snapshot: true + emergency_skip_drain: true skip_etcd_snapshot: false skip_drain: false drain_parallelism: 6 diff --git a/configs/hecate.tethys.yaml b/configs/hecate.tethys.yaml index 2f3d0ef..766c822 100644 --- a/configs/hecate.tethys.yaml +++ b/configs/hecate.tethys.yaml @@ -88,11 +88,14 @@ local_bootstrap_paths: - infrastructure/sources/helm - infrastructure/metallb - infrastructure/traefik + - infrastructure/cert-manager - infrastructure/vault-csi - infrastructure/vault-injector - services/vault - infrastructure/postgres - services/gitea + - services/keycloak + - services/oauth2-proxy excluded_namespaces: - kube-system - kube-public @@ -116,6 +119,11 @@ startup: etcd_restore_control_plane: titan-0a shutdown: default_budget_seconds: 1380 + history_min_samples: 3 + emergency_budget_seconds: 420 + emergency_history_min_samples: 3 + emergency_skip_etcd_snapshot: true + emergency_skip_drain: true skip_etcd_snapshot: false skip_drain: false drain_parallelism: 6 diff --git a/configs/hecate.titan-db.yaml b/configs/hecate.titan-db.yaml index dada801..28dda02 100644 --- a/configs/hecate.titan-db.yaml +++ b/configs/hecate.titan-db.yaml @@ -88,11 +88,14 @@ local_bootstrap_paths: - infrastructure/sources/helm - infrastructure/metallb - infrastructure/traefik + - infrastructure/cert-manager - infrastructure/vault-csi - infrastructure/vault-injector - services/vault - infrastructure/postgres - services/gitea + - services/keycloak + - services/oauth2-proxy excluded_namespaces: - kube-system - kube-public @@ -116,6 +119,11 @@ startup: etcd_restore_control_plane: titan-0a shutdown: default_budget_seconds: 1380 + history_min_samples: 3 + emergency_budget_seconds: 420 + emergency_history_min_samples: 3 + emergency_skip_etcd_snapshot: true + emergency_skip_drain: true skip_etcd_snapshot: false skip_drain: false drain_parallelism: 6 diff --git a/internal/cluster/orchestrator.go b/internal/cluster/orchestrator.go index 3de87f8..2e3c47f 100644 --- a/internal/cluster/orchestrator.go +++ b/internal/cluster/orchestrator.go @@ -175,6 +175,14 @@ func (o *Orchestrator) Startup(ctx context.Context, opts StartupOptions) (err er } } + desiredFluxBranch := strings.TrimSpace(opts.ForceFluxBranch) + if desiredFluxBranch == "" { + desiredFluxBranch = strings.TrimSpace(o.cfg.ExpectedFluxBranch) + } + if err := o.ensureFluxBranch(ctx, desiredFluxBranch); err != nil { + return err + } + workers, err := o.effectiveWorkers(ctx) if err != nil { return err @@ -186,13 +194,6 @@ func (o *Orchestrator) Startup(ctx context.Context, opts StartupOptions) (err er o.startWorkers(ctx, workers) o.bestEffort("uncordon workers", func() error { return o.uncordonWorkers(ctx, workers) }) - if opts.ForceFluxBranch != "" { - patch := fmt.Sprintf(`{"spec":{"ref":{"branch":"%s"}}}`, opts.ForceFluxBranch) - if _, err := o.kubectl(ctx, 20*time.Second, "-n", "flux-system", "patch", "gitrepository", "flux-system", "--type=merge", "-p", patch); err != nil { - return fmt.Errorf("force flux branch: %w", err) - } - } - needsLocalBootstrap := false bootstrapReasons := []string{} if !opts.SkipLocalBootstrap { @@ -393,7 +394,15 @@ func (o *Orchestrator) Shutdown(ctx context.Context, opts ShutdownOptions) (err } func (o *Orchestrator) EstimatedShutdownSeconds() int { - return o.store.ShutdownP95(o.cfg.Shutdown.DefaultBudgetSeconds) + return o.store.ShutdownP95WithMinSamples(o.cfg.Shutdown.DefaultBudgetSeconds, o.cfg.Shutdown.HistoryMinSamples) +} + +func (o *Orchestrator) EstimatedEmergencyShutdownSeconds() int { + return o.store.ShutdownP95ByReasonPrefix( + o.cfg.Shutdown.EmergencyBudgetSec, + o.cfg.Shutdown.EmergencyMinSamples, + []string{"ups-", "emergency-", "drill-emergency"}, + ) } func (o *Orchestrator) finalizeRecord(record *state.RunRecord, err *error) { @@ -1131,6 +1140,45 @@ func (o *Orchestrator) reportFluxSource(ctx context.Context, forceBranch string) } } +func (o *Orchestrator) ensureFluxBranch(ctx context.Context, branch string) error { + branch = strings.TrimSpace(branch) + if branch == "" { + return nil + } + + out, err := o.kubectl( + ctx, + 10*time.Second, + "-n", "flux-system", + "get", "gitrepository", "flux-system", + "-o", "jsonpath={.spec.ref.branch}", + ) + if err != nil { + if isNotFoundErr(err) { + o.log.Printf("warning: flux gitrepository/flux-system not found while ensuring branch=%s", branch) + return nil + } + return fmt.Errorf("read flux source branch: %w", err) + } + current := strings.TrimSpace(out) + if current == branch { + return nil + } + patch := fmt.Sprintf(`{"spec":{"ref":{"branch":"%s"}}}`, branch) + if _, err := o.kubectl( + ctx, + 20*time.Second, + "-n", "flux-system", + "patch", "gitrepository", "flux-system", + "--type=merge", + "-p", patch, + ); err != nil { + return fmt.Errorf("set flux source branch %q (current %q): %w", branch, current, err) + } + o.log.Printf("updated flux source branch from %q to %q", current, branch) + return nil +} + func (o *Orchestrator) bootstrapLocal(ctx context.Context) error { failures := 0 for _, rel := range o.cfg.LocalBootstrapPaths { diff --git a/internal/config/config.go b/internal/config/config.go index 5793c86..56aec4d 100644 --- a/internal/config/config.go +++ b/internal/config/config.go @@ -45,6 +45,11 @@ type Startup struct { type Shutdown struct { DefaultBudgetSeconds int `yaml:"default_budget_seconds"` + HistoryMinSamples int `yaml:"history_min_samples"` + EmergencyBudgetSec int `yaml:"emergency_budget_seconds"` + EmergencyMinSamples int `yaml:"emergency_history_min_samples"` + EmergencySkipEtcd bool `yaml:"emergency_skip_etcd_snapshot"` + EmergencySkipDrain bool `yaml:"emergency_skip_drain"` SkipEtcdSnapshot bool `yaml:"skip_etcd_snapshot"` SkipDrain bool `yaml:"skip_drain"` DrainParallelism int `yaml:"drain_parallelism"` @@ -127,6 +132,15 @@ func (c Config) Validate() error { if c.Shutdown.DefaultBudgetSeconds <= 0 { return fmt.Errorf("config.shutdown.default_budget_seconds must be > 0") } + if c.Shutdown.HistoryMinSamples <= 0 { + return fmt.Errorf("config.shutdown.history_min_samples must be > 0") + } + if c.Shutdown.EmergencyBudgetSec <= 0 { + return fmt.Errorf("config.shutdown.emergency_budget_seconds must be > 0") + } + if c.Shutdown.EmergencyMinSamples <= 0 { + return fmt.Errorf("config.shutdown.emergency_history_min_samples must be > 0") + } if c.Shutdown.DrainParallelism <= 0 { return fmt.Errorf("config.shutdown.drain_parallelism must be > 0") } @@ -208,11 +222,14 @@ func defaults() Config { "infrastructure/sources/helm", "infrastructure/metallb", "infrastructure/traefik", + "infrastructure/cert-manager", "infrastructure/vault-csi", "infrastructure/vault-injector", "services/vault", "infrastructure/postgres", "services/gitea", + "services/keycloak", + "services/oauth2-proxy", }, ExcludedNamespaces: []string{ "kube-system", @@ -239,6 +256,11 @@ func defaults() Config { }, Shutdown: Shutdown{ DefaultBudgetSeconds: 1380, + HistoryMinSamples: 3, + EmergencyBudgetSec: 420, + EmergencyMinSamples: 3, + EmergencySkipEtcd: true, + EmergencySkipDrain: true, DrainParallelism: 6, ScaleParallelism: 8, SSHParallelism: 8, @@ -306,6 +328,15 @@ func (c *Config) applyDefaults() { if c.Shutdown.DefaultBudgetSeconds <= 0 { c.Shutdown.DefaultBudgetSeconds = 1380 } + if c.Shutdown.HistoryMinSamples <= 0 { + c.Shutdown.HistoryMinSamples = 3 + } + if c.Shutdown.EmergencyBudgetSec <= 0 { + c.Shutdown.EmergencyBudgetSec = 420 + } + if c.Shutdown.EmergencyMinSamples <= 0 { + c.Shutdown.EmergencyMinSamples = 3 + } if c.Shutdown.DrainParallelism <= 0 { c.Shutdown.DrainParallelism = 6 } diff --git a/internal/service/daemon.go b/internal/service/daemon.go index 0d88034..77fe119 100644 --- a/internal/service/daemon.go +++ b/internal/service/daemon.go @@ -89,7 +89,7 @@ func (d *Daemon) Run(ctx context.Context) error { case <-ctx.Done(): return ctx.Err() case <-t.C: - budget := d.orch.EstimatedShutdownSeconds() + budget := d.orch.EstimatedEmergencyShutdownSeconds() threshold := int(math.Ceil(float64(budget) * d.cfg.UPS.RuntimeSafetyFactor)) d.exporter.UpdateBudget(budget) @@ -177,7 +177,11 @@ func (d *Daemon) triggerShutdown(ctx context.Context, reason string) error { d.log.Printf("warning: forward shutdown failed; falling back to local shutdown: %v", err) } } - if err := d.orch.Shutdown(ctx, cluster.ShutdownOptions{Reason: reason}); err != nil { + if err := d.orch.Shutdown(ctx, cluster.ShutdownOptions{ + Reason: reason, + SkipDrain: d.cfg.Shutdown.EmergencySkipDrain, + SkipEtcdSnapshot: d.cfg.Shutdown.EmergencySkipEtcd, + }); err != nil { return err } if setErr := state.MustWriteIntent(d.cfg.State.IntentPath, state.IntentShutdownComplete, reason, "daemon-local"); setErr != nil { @@ -194,11 +198,13 @@ func (d *Daemon) forwardShutdown(ctx context.Context, reason string) error { runCtx, cancel := context.WithTimeout(ctx, timeout) defer cancel() - remoteCmd := fmt.Sprintf( - "sudo /usr/local/bin/hecate shutdown --config %q --execute --reason %q", - d.cfg.Coordination.ForwardShutdownConfig, - reason, - ) + remoteCmd := fmt.Sprintf("sudo /usr/local/bin/hecate shutdown --config %q --execute --reason %q", d.cfg.Coordination.ForwardShutdownConfig, reason) + if d.cfg.Shutdown.EmergencySkipEtcd { + remoteCmd += " --skip-etcd-snapshot" + } + if d.cfg.Shutdown.EmergencySkipDrain { + remoteCmd += " --skip-drain" + } host := d.cfg.Coordination.ForwardShutdownHost if mapped, ok := d.cfg.SSHNodeHosts[host]; ok && strings.TrimSpace(mapped) != "" { host = strings.TrimSpace(mapped) diff --git a/internal/state/store.go b/internal/state/store.go index 85b1b0f..bb9feb3 100644 --- a/internal/state/store.go +++ b/internal/state/store.go @@ -167,17 +167,51 @@ func (s *Store) loadUnlocked() ([]RunRecord, error) { } func (s *Store) ShutdownP95(defaultSeconds int) int { + return s.shutdownP95(defaultSeconds, 1, nil) +} + +func (s *Store) ShutdownP95WithMinSamples(defaultSeconds int, minSamples int) int { + return s.shutdownP95(defaultSeconds, minSamples, nil) +} + +func (s *Store) ShutdownP95ByReasonPrefix(defaultSeconds int, minSamples int, reasonPrefixes []string) int { + return s.shutdownP95(defaultSeconds, minSamples, reasonPrefixes) +} + +func (s *Store) shutdownP95(defaultSeconds int, minSamples int, reasonPrefixes []string) int { + if minSamples <= 0 { + minSamples = 1 + } records, err := s.Load() if err != nil { return defaultSeconds } var d []int + filteredPrefixes := make([]string, 0, len(reasonPrefixes)) + for _, prefix := range reasonPrefixes { + p := strings.ToLower(strings.TrimSpace(prefix)) + if p != "" { + filteredPrefixes = append(filteredPrefixes, p) + } + } + matchesPrefix := func(reason string) bool { + if len(filteredPrefixes) == 0 { + return true + } + r := strings.ToLower(strings.TrimSpace(reason)) + for _, prefix := range filteredPrefixes { + if strings.HasPrefix(r, prefix) { + return true + } + } + return false + } for _, r := range records { - if r.Action == "shutdown" && r.Success && r.DurationSeconds > 0 { + if r.Action == "shutdown" && r.Success && r.DurationSeconds > 0 && matchesPrefix(r.Reason) { d = append(d, r.DurationSeconds) } } - if len(d) == 0 { + if len(d) < minSamples { return defaultSeconds } sort.Ints(d) diff --git a/internal/state/store_test.go b/internal/state/store_test.go index 3eb5dc8..838054e 100644 --- a/internal/state/store_test.go +++ b/internal/state/store_test.go @@ -1,11 +1,13 @@ package state import ( + "encoding/json" "os" "path/filepath" "strconv" "strings" "testing" + "time" ) func TestAcquireLockLifecycle(t *testing.T) { @@ -85,3 +87,76 @@ func TestStoreLoadAutoHealsCorruptJSON(t *testing.T) { t.Fatalf("expected 1 backup file, got %d (%v)", len(matches), matches) } } + +func TestShutdownP95WithMinSamplesFallsBackWhenHistorySparse(t *testing.T) { + p := filepath.Join(t.TempDir(), "runs.json") + records := []RunRecord{ + { + ID: "shutdown-1", + Action: "shutdown", + Reason: "manual", + StartedAt: time.Now().UTC(), + EndedAt: time.Now().UTC(), + DurationSeconds: 45, + Success: true, + }, + } + b, err := json.Marshal(records) + if err != nil { + t.Fatalf("marshal records: %v", err) + } + if err := os.WriteFile(p, b, 0o640); err != nil { + t.Fatalf("write records: %v", err) + } + + got := New(p).ShutdownP95WithMinSamples(300, 3) + if got != 300 { + t.Fatalf("expected fallback default 300 with sparse history, got %d", got) + } +} + +func TestShutdownP95ByReasonPrefixFiltersSamples(t *testing.T) { + p := filepath.Join(t.TempDir(), "runs.json") + now := time.Now().UTC() + records := []RunRecord{ + { + ID: "shutdown-1", + Action: "shutdown", + Reason: "ups-threshold target=Pyrphoros", + StartedAt: now, + EndedAt: now, + DurationSeconds: 180, + Success: true, + }, + { + ID: "shutdown-2", + Action: "shutdown", + Reason: "ups-threshold target=Pyrphoros", + StartedAt: now, + EndedAt: now, + DurationSeconds: 220, + Success: true, + }, + { + ID: "shutdown-3", + Action: "shutdown", + Reason: "manual-maintenance", + StartedAt: now, + EndedAt: now, + DurationSeconds: 1200, + Success: true, + }, + } + b, err := json.Marshal(records) + if err != nil { + t.Fatalf("marshal records: %v", err) + } + if err := os.WriteFile(p, b, 0o640); err != nil { + t.Fatalf("write records: %v", err) + } + + got := New(p).ShutdownP95ByReasonPrefix(420, 2, []string{"ups-"}) + if got != 220 { + t.Fatalf("expected filtered p95 of 220, got %d", got) + } +}