hecate: harden emergency thresholds and bootstrap branch handling

This commit is contained in:
Brad Stein 2026-04-05 00:15:09 -03:00
parent 4c03be1e9b
commit f020f77d2b
10 changed files with 241 additions and 18 deletions

View File

@ -92,8 +92,12 @@ UPS auto-shutdown trigger uses:
- runtime threshold = `runtime_safety_factor * estimated_shutdown_budget` - runtime threshold = `runtime_safety_factor * estimated_shutdown_budget`
- default safety factor `1.25` - default safety factor `1.25`
- debounce across multiple polls to avoid noise - debounce across multiple polls to avoid noise
- emergency trigger budget defaults to `shutdown.emergency_budget_seconds` and is learned from historical UPS-triggered shutdown runs once enough samples exist
- UPS-triggered shutdown executes the emergency fast path by default (`shutdown.emergency_skip_drain: true`, `shutdown.emergency_skip_etcd_snapshot: true`)
Estimated shutdown budget is derived from historical successful shutdown runs (`/var/lib/hecate/runs.json`) with default fallback from config. Estimated shutdown budgets are derived from historical successful shutdown runs (`/var/lib/hecate/runs.json`) with config fallbacks:
- `estimated_shutdown_budget_seconds`: full/manual shutdown path
- `estimated_emergency_shutdown_budget_seconds`: UPS/emergency path
Power metrics: Power metrics:
- Hecate exposes Prometheus metrics on `:9560/metrics` by default. - Hecate exposes Prometheus metrics on `:9560/metrics` by default.

View File

@ -231,6 +231,7 @@ func runStatus(logger *log.Logger, args []string) error {
logger.Printf("expected_flux_branch=%s", cfg.ExpectedFluxBranch) logger.Printf("expected_flux_branch=%s", cfg.ExpectedFluxBranch)
logger.Printf("control_planes=%v", cfg.ControlPlanes) logger.Printf("control_planes=%v", cfg.ControlPlanes)
logger.Printf("estimated_shutdown_budget_seconds=%d", orch.EstimatedShutdownSeconds()) logger.Printf("estimated_shutdown_budget_seconds=%d", orch.EstimatedShutdownSeconds())
logger.Printf("estimated_emergency_shutdown_budget_seconds=%d", orch.EstimatedEmergencyShutdownSeconds())
intent, intentErr := state.ReadIntent(cfg.State.IntentPath) intent, intentErr := state.ReadIntent(cfg.State.IntentPath)
if intentErr != nil { if intentErr != nil {
logger.Printf("intent_read_error=%v", intentErr) logger.Printf("intent_read_error=%v", intentErr)

View File

@ -22,11 +22,14 @@ local_bootstrap_paths:
- infrastructure/sources/helm - infrastructure/sources/helm
- infrastructure/metallb - infrastructure/metallb
- infrastructure/traefik - infrastructure/traefik
- infrastructure/cert-manager
- infrastructure/vault-csi - infrastructure/vault-csi
- infrastructure/vault-injector - infrastructure/vault-injector
- services/vault - services/vault
- infrastructure/postgres - infrastructure/postgres
- services/gitea - services/gitea
- services/keycloak
- services/oauth2-proxy
excluded_namespaces: excluded_namespaces:
- kube-system - kube-system
- kube-public - kube-public
@ -50,6 +53,11 @@ startup:
etcd_restore_control_plane: titan-0a etcd_restore_control_plane: titan-0a
shutdown: shutdown:
default_budget_seconds: 1380 default_budget_seconds: 1380
history_min_samples: 3
emergency_budget_seconds: 420
emergency_history_min_samples: 3
emergency_skip_etcd_snapshot: true
emergency_skip_drain: true
skip_etcd_snapshot: false skip_etcd_snapshot: false
skip_drain: false skip_drain: false
drain_parallelism: 6 drain_parallelism: 6

View File

@ -88,11 +88,14 @@ local_bootstrap_paths:
- infrastructure/sources/helm - infrastructure/sources/helm
- infrastructure/metallb - infrastructure/metallb
- infrastructure/traefik - infrastructure/traefik
- infrastructure/cert-manager
- infrastructure/vault-csi - infrastructure/vault-csi
- infrastructure/vault-injector - infrastructure/vault-injector
- services/vault - services/vault
- infrastructure/postgres - infrastructure/postgres
- services/gitea - services/gitea
- services/keycloak
- services/oauth2-proxy
excluded_namespaces: excluded_namespaces:
- kube-system - kube-system
- kube-public - kube-public
@ -116,6 +119,11 @@ startup:
etcd_restore_control_plane: titan-0a etcd_restore_control_plane: titan-0a
shutdown: shutdown:
default_budget_seconds: 1380 default_budget_seconds: 1380
history_min_samples: 3
emergency_budget_seconds: 420
emergency_history_min_samples: 3
emergency_skip_etcd_snapshot: true
emergency_skip_drain: true
skip_etcd_snapshot: false skip_etcd_snapshot: false
skip_drain: false skip_drain: false
drain_parallelism: 6 drain_parallelism: 6

View File

@ -88,11 +88,14 @@ local_bootstrap_paths:
- infrastructure/sources/helm - infrastructure/sources/helm
- infrastructure/metallb - infrastructure/metallb
- infrastructure/traefik - infrastructure/traefik
- infrastructure/cert-manager
- infrastructure/vault-csi - infrastructure/vault-csi
- infrastructure/vault-injector - infrastructure/vault-injector
- services/vault - services/vault
- infrastructure/postgres - infrastructure/postgres
- services/gitea - services/gitea
- services/keycloak
- services/oauth2-proxy
excluded_namespaces: excluded_namespaces:
- kube-system - kube-system
- kube-public - kube-public
@ -116,6 +119,11 @@ startup:
etcd_restore_control_plane: titan-0a etcd_restore_control_plane: titan-0a
shutdown: shutdown:
default_budget_seconds: 1380 default_budget_seconds: 1380
history_min_samples: 3
emergency_budget_seconds: 420
emergency_history_min_samples: 3
emergency_skip_etcd_snapshot: true
emergency_skip_drain: true
skip_etcd_snapshot: false skip_etcd_snapshot: false
skip_drain: false skip_drain: false
drain_parallelism: 6 drain_parallelism: 6

View File

@ -175,6 +175,14 @@ func (o *Orchestrator) Startup(ctx context.Context, opts StartupOptions) (err er
} }
} }
desiredFluxBranch := strings.TrimSpace(opts.ForceFluxBranch)
if desiredFluxBranch == "" {
desiredFluxBranch = strings.TrimSpace(o.cfg.ExpectedFluxBranch)
}
if err := o.ensureFluxBranch(ctx, desiredFluxBranch); err != nil {
return err
}
workers, err := o.effectiveWorkers(ctx) workers, err := o.effectiveWorkers(ctx)
if err != nil { if err != nil {
return err return err
@ -186,13 +194,6 @@ func (o *Orchestrator) Startup(ctx context.Context, opts StartupOptions) (err er
o.startWorkers(ctx, workers) o.startWorkers(ctx, workers)
o.bestEffort("uncordon workers", func() error { return o.uncordonWorkers(ctx, workers) }) o.bestEffort("uncordon workers", func() error { return o.uncordonWorkers(ctx, workers) })
if opts.ForceFluxBranch != "" {
patch := fmt.Sprintf(`{"spec":{"ref":{"branch":"%s"}}}`, opts.ForceFluxBranch)
if _, err := o.kubectl(ctx, 20*time.Second, "-n", "flux-system", "patch", "gitrepository", "flux-system", "--type=merge", "-p", patch); err != nil {
return fmt.Errorf("force flux branch: %w", err)
}
}
needsLocalBootstrap := false needsLocalBootstrap := false
bootstrapReasons := []string{} bootstrapReasons := []string{}
if !opts.SkipLocalBootstrap { if !opts.SkipLocalBootstrap {
@ -393,7 +394,15 @@ func (o *Orchestrator) Shutdown(ctx context.Context, opts ShutdownOptions) (err
} }
func (o *Orchestrator) EstimatedShutdownSeconds() int { func (o *Orchestrator) EstimatedShutdownSeconds() int {
return o.store.ShutdownP95(o.cfg.Shutdown.DefaultBudgetSeconds) return o.store.ShutdownP95WithMinSamples(o.cfg.Shutdown.DefaultBudgetSeconds, o.cfg.Shutdown.HistoryMinSamples)
}
func (o *Orchestrator) EstimatedEmergencyShutdownSeconds() int {
return o.store.ShutdownP95ByReasonPrefix(
o.cfg.Shutdown.EmergencyBudgetSec,
o.cfg.Shutdown.EmergencyMinSamples,
[]string{"ups-", "emergency-", "drill-emergency"},
)
} }
func (o *Orchestrator) finalizeRecord(record *state.RunRecord, err *error) { func (o *Orchestrator) finalizeRecord(record *state.RunRecord, err *error) {
@ -1131,6 +1140,45 @@ func (o *Orchestrator) reportFluxSource(ctx context.Context, forceBranch string)
} }
} }
func (o *Orchestrator) ensureFluxBranch(ctx context.Context, branch string) error {
branch = strings.TrimSpace(branch)
if branch == "" {
return nil
}
out, err := o.kubectl(
ctx,
10*time.Second,
"-n", "flux-system",
"get", "gitrepository", "flux-system",
"-o", "jsonpath={.spec.ref.branch}",
)
if err != nil {
if isNotFoundErr(err) {
o.log.Printf("warning: flux gitrepository/flux-system not found while ensuring branch=%s", branch)
return nil
}
return fmt.Errorf("read flux source branch: %w", err)
}
current := strings.TrimSpace(out)
if current == branch {
return nil
}
patch := fmt.Sprintf(`{"spec":{"ref":{"branch":"%s"}}}`, branch)
if _, err := o.kubectl(
ctx,
20*time.Second,
"-n", "flux-system",
"patch", "gitrepository", "flux-system",
"--type=merge",
"-p", patch,
); err != nil {
return fmt.Errorf("set flux source branch %q (current %q): %w", branch, current, err)
}
o.log.Printf("updated flux source branch from %q to %q", current, branch)
return nil
}
func (o *Orchestrator) bootstrapLocal(ctx context.Context) error { func (o *Orchestrator) bootstrapLocal(ctx context.Context) error {
failures := 0 failures := 0
for _, rel := range o.cfg.LocalBootstrapPaths { for _, rel := range o.cfg.LocalBootstrapPaths {

View File

@ -45,6 +45,11 @@ type Startup struct {
type Shutdown struct { type Shutdown struct {
DefaultBudgetSeconds int `yaml:"default_budget_seconds"` DefaultBudgetSeconds int `yaml:"default_budget_seconds"`
HistoryMinSamples int `yaml:"history_min_samples"`
EmergencyBudgetSec int `yaml:"emergency_budget_seconds"`
EmergencyMinSamples int `yaml:"emergency_history_min_samples"`
EmergencySkipEtcd bool `yaml:"emergency_skip_etcd_snapshot"`
EmergencySkipDrain bool `yaml:"emergency_skip_drain"`
SkipEtcdSnapshot bool `yaml:"skip_etcd_snapshot"` SkipEtcdSnapshot bool `yaml:"skip_etcd_snapshot"`
SkipDrain bool `yaml:"skip_drain"` SkipDrain bool `yaml:"skip_drain"`
DrainParallelism int `yaml:"drain_parallelism"` DrainParallelism int `yaml:"drain_parallelism"`
@ -127,6 +132,15 @@ func (c Config) Validate() error {
if c.Shutdown.DefaultBudgetSeconds <= 0 { if c.Shutdown.DefaultBudgetSeconds <= 0 {
return fmt.Errorf("config.shutdown.default_budget_seconds must be > 0") return fmt.Errorf("config.shutdown.default_budget_seconds must be > 0")
} }
if c.Shutdown.HistoryMinSamples <= 0 {
return fmt.Errorf("config.shutdown.history_min_samples must be > 0")
}
if c.Shutdown.EmergencyBudgetSec <= 0 {
return fmt.Errorf("config.shutdown.emergency_budget_seconds must be > 0")
}
if c.Shutdown.EmergencyMinSamples <= 0 {
return fmt.Errorf("config.shutdown.emergency_history_min_samples must be > 0")
}
if c.Shutdown.DrainParallelism <= 0 { if c.Shutdown.DrainParallelism <= 0 {
return fmt.Errorf("config.shutdown.drain_parallelism must be > 0") return fmt.Errorf("config.shutdown.drain_parallelism must be > 0")
} }
@ -208,11 +222,14 @@ func defaults() Config {
"infrastructure/sources/helm", "infrastructure/sources/helm",
"infrastructure/metallb", "infrastructure/metallb",
"infrastructure/traefik", "infrastructure/traefik",
"infrastructure/cert-manager",
"infrastructure/vault-csi", "infrastructure/vault-csi",
"infrastructure/vault-injector", "infrastructure/vault-injector",
"services/vault", "services/vault",
"infrastructure/postgres", "infrastructure/postgres",
"services/gitea", "services/gitea",
"services/keycloak",
"services/oauth2-proxy",
}, },
ExcludedNamespaces: []string{ ExcludedNamespaces: []string{
"kube-system", "kube-system",
@ -239,6 +256,11 @@ func defaults() Config {
}, },
Shutdown: Shutdown{ Shutdown: Shutdown{
DefaultBudgetSeconds: 1380, DefaultBudgetSeconds: 1380,
HistoryMinSamples: 3,
EmergencyBudgetSec: 420,
EmergencyMinSamples: 3,
EmergencySkipEtcd: true,
EmergencySkipDrain: true,
DrainParallelism: 6, DrainParallelism: 6,
ScaleParallelism: 8, ScaleParallelism: 8,
SSHParallelism: 8, SSHParallelism: 8,
@ -306,6 +328,15 @@ func (c *Config) applyDefaults() {
if c.Shutdown.DefaultBudgetSeconds <= 0 { if c.Shutdown.DefaultBudgetSeconds <= 0 {
c.Shutdown.DefaultBudgetSeconds = 1380 c.Shutdown.DefaultBudgetSeconds = 1380
} }
if c.Shutdown.HistoryMinSamples <= 0 {
c.Shutdown.HistoryMinSamples = 3
}
if c.Shutdown.EmergencyBudgetSec <= 0 {
c.Shutdown.EmergencyBudgetSec = 420
}
if c.Shutdown.EmergencyMinSamples <= 0 {
c.Shutdown.EmergencyMinSamples = 3
}
if c.Shutdown.DrainParallelism <= 0 { if c.Shutdown.DrainParallelism <= 0 {
c.Shutdown.DrainParallelism = 6 c.Shutdown.DrainParallelism = 6
} }

View File

@ -89,7 +89,7 @@ func (d *Daemon) Run(ctx context.Context) error {
case <-ctx.Done(): case <-ctx.Done():
return ctx.Err() return ctx.Err()
case <-t.C: case <-t.C:
budget := d.orch.EstimatedShutdownSeconds() budget := d.orch.EstimatedEmergencyShutdownSeconds()
threshold := int(math.Ceil(float64(budget) * d.cfg.UPS.RuntimeSafetyFactor)) threshold := int(math.Ceil(float64(budget) * d.cfg.UPS.RuntimeSafetyFactor))
d.exporter.UpdateBudget(budget) d.exporter.UpdateBudget(budget)
@ -177,7 +177,11 @@ func (d *Daemon) triggerShutdown(ctx context.Context, reason string) error {
d.log.Printf("warning: forward shutdown failed; falling back to local shutdown: %v", err) d.log.Printf("warning: forward shutdown failed; falling back to local shutdown: %v", err)
} }
} }
if err := d.orch.Shutdown(ctx, cluster.ShutdownOptions{Reason: reason}); err != nil { if err := d.orch.Shutdown(ctx, cluster.ShutdownOptions{
Reason: reason,
SkipDrain: d.cfg.Shutdown.EmergencySkipDrain,
SkipEtcdSnapshot: d.cfg.Shutdown.EmergencySkipEtcd,
}); err != nil {
return err return err
} }
if setErr := state.MustWriteIntent(d.cfg.State.IntentPath, state.IntentShutdownComplete, reason, "daemon-local"); setErr != nil { if setErr := state.MustWriteIntent(d.cfg.State.IntentPath, state.IntentShutdownComplete, reason, "daemon-local"); setErr != nil {
@ -194,11 +198,13 @@ func (d *Daemon) forwardShutdown(ctx context.Context, reason string) error {
runCtx, cancel := context.WithTimeout(ctx, timeout) runCtx, cancel := context.WithTimeout(ctx, timeout)
defer cancel() defer cancel()
remoteCmd := fmt.Sprintf( remoteCmd := fmt.Sprintf("sudo /usr/local/bin/hecate shutdown --config %q --execute --reason %q", d.cfg.Coordination.ForwardShutdownConfig, reason)
"sudo /usr/local/bin/hecate shutdown --config %q --execute --reason %q", if d.cfg.Shutdown.EmergencySkipEtcd {
d.cfg.Coordination.ForwardShutdownConfig, remoteCmd += " --skip-etcd-snapshot"
reason, }
) if d.cfg.Shutdown.EmergencySkipDrain {
remoteCmd += " --skip-drain"
}
host := d.cfg.Coordination.ForwardShutdownHost host := d.cfg.Coordination.ForwardShutdownHost
if mapped, ok := d.cfg.SSHNodeHosts[host]; ok && strings.TrimSpace(mapped) != "" { if mapped, ok := d.cfg.SSHNodeHosts[host]; ok && strings.TrimSpace(mapped) != "" {
host = strings.TrimSpace(mapped) host = strings.TrimSpace(mapped)

View File

@ -167,17 +167,51 @@ func (s *Store) loadUnlocked() ([]RunRecord, error) {
} }
func (s *Store) ShutdownP95(defaultSeconds int) int { func (s *Store) ShutdownP95(defaultSeconds int) int {
return s.shutdownP95(defaultSeconds, 1, nil)
}
func (s *Store) ShutdownP95WithMinSamples(defaultSeconds int, minSamples int) int {
return s.shutdownP95(defaultSeconds, minSamples, nil)
}
func (s *Store) ShutdownP95ByReasonPrefix(defaultSeconds int, minSamples int, reasonPrefixes []string) int {
return s.shutdownP95(defaultSeconds, minSamples, reasonPrefixes)
}
func (s *Store) shutdownP95(defaultSeconds int, minSamples int, reasonPrefixes []string) int {
if minSamples <= 0 {
minSamples = 1
}
records, err := s.Load() records, err := s.Load()
if err != nil { if err != nil {
return defaultSeconds return defaultSeconds
} }
var d []int var d []int
filteredPrefixes := make([]string, 0, len(reasonPrefixes))
for _, prefix := range reasonPrefixes {
p := strings.ToLower(strings.TrimSpace(prefix))
if p != "" {
filteredPrefixes = append(filteredPrefixes, p)
}
}
matchesPrefix := func(reason string) bool {
if len(filteredPrefixes) == 0 {
return true
}
r := strings.ToLower(strings.TrimSpace(reason))
for _, prefix := range filteredPrefixes {
if strings.HasPrefix(r, prefix) {
return true
}
}
return false
}
for _, r := range records { for _, r := range records {
if r.Action == "shutdown" && r.Success && r.DurationSeconds > 0 { if r.Action == "shutdown" && r.Success && r.DurationSeconds > 0 && matchesPrefix(r.Reason) {
d = append(d, r.DurationSeconds) d = append(d, r.DurationSeconds)
} }
} }
if len(d) == 0 { if len(d) < minSamples {
return defaultSeconds return defaultSeconds
} }
sort.Ints(d) sort.Ints(d)

View File

@ -1,11 +1,13 @@
package state package state
import ( import (
"encoding/json"
"os" "os"
"path/filepath" "path/filepath"
"strconv" "strconv"
"strings" "strings"
"testing" "testing"
"time"
) )
func TestAcquireLockLifecycle(t *testing.T) { func TestAcquireLockLifecycle(t *testing.T) {
@ -85,3 +87,76 @@ func TestStoreLoadAutoHealsCorruptJSON(t *testing.T) {
t.Fatalf("expected 1 backup file, got %d (%v)", len(matches), matches) t.Fatalf("expected 1 backup file, got %d (%v)", len(matches), matches)
} }
} }
func TestShutdownP95WithMinSamplesFallsBackWhenHistorySparse(t *testing.T) {
p := filepath.Join(t.TempDir(), "runs.json")
records := []RunRecord{
{
ID: "shutdown-1",
Action: "shutdown",
Reason: "manual",
StartedAt: time.Now().UTC(),
EndedAt: time.Now().UTC(),
DurationSeconds: 45,
Success: true,
},
}
b, err := json.Marshal(records)
if err != nil {
t.Fatalf("marshal records: %v", err)
}
if err := os.WriteFile(p, b, 0o640); err != nil {
t.Fatalf("write records: %v", err)
}
got := New(p).ShutdownP95WithMinSamples(300, 3)
if got != 300 {
t.Fatalf("expected fallback default 300 with sparse history, got %d", got)
}
}
func TestShutdownP95ByReasonPrefixFiltersSamples(t *testing.T) {
p := filepath.Join(t.TempDir(), "runs.json")
now := time.Now().UTC()
records := []RunRecord{
{
ID: "shutdown-1",
Action: "shutdown",
Reason: "ups-threshold target=Pyrphoros",
StartedAt: now,
EndedAt: now,
DurationSeconds: 180,
Success: true,
},
{
ID: "shutdown-2",
Action: "shutdown",
Reason: "ups-threshold target=Pyrphoros",
StartedAt: now,
EndedAt: now,
DurationSeconds: 220,
Success: true,
},
{
ID: "shutdown-3",
Action: "shutdown",
Reason: "manual-maintenance",
StartedAt: now,
EndedAt: now,
DurationSeconds: 1200,
Success: true,
},
}
b, err := json.Marshal(records)
if err != nil {
t.Fatalf("marshal records: %v", err)
}
if err := os.WriteFile(p, b, 0o640); err != nil {
t.Fatalf("write records: %v", err)
}
got := New(p).ShutdownP95ByReasonPrefix(420, 2, []string{"ups-"})
if got != 220 {
t.Fatalf("expected filtered p95 of 220, got %d", got)
}
}