From 3af6fe9f6f0a5d1d4e8c1c6a9f44c034323757a2 Mon Sep 17 00:00:00 2001 From: Brad Stein Date: Sat, 4 Apr 2026 12:44:15 -0300 Subject: [PATCH] hecate(startup): add coordinated intent guards and resilient recovery ssh --- README.md | 13 +++ cmd/hecate/main.go | 114 ++++++++++++++------ configs/hecate.example.yaml | 17 ++- configs/hecate.tethys.yaml | 41 +++++++- configs/hecate.titan-db.yaml | 51 ++++++++- internal/cluster/orchestrator.go | 172 ++++++++++++++++++++++++++++--- internal/config/config.go | 82 ++++++++++++--- internal/config/config_test.go | 8 ++ internal/service/daemon.go | 67 ++++++++++-- internal/state/intent.go | 69 +++++++++++++ internal/state/intent_test.go | 30 ++++++ scripts/hecate-drills.sh | 41 +++++++- 12 files changed, 634 insertions(+), 71 deletions(-) create mode 100644 internal/state/intent.go create mode 100644 internal/state/intent_test.go diff --git a/README.md b/README.md index 8f68841..8e69350 100644 --- a/README.md +++ b/README.md @@ -21,6 +21,11 @@ Hecate runs outside the cluster under systemd, so it can always orchestrate brin - `hecate daemon --config /etc/hecate/hecate.yaml` - `hecate status --config /etc/hecate/hecate.yaml` +Key startup guards: +- Startup is blocked on hosts configured as `coordination.role: peer` (unless `--allow-peer-startup` is used intentionally). +- Startup is blocked while UPS is on battery by default (unless `--allow-on-battery` or `coordination.allow_startup_on_battery: true` is set). +- Startup is blocked when a shutdown intent is active (`/var/lib/hecate/intent.json`). + ## Manual install on titan-db ```bash @@ -61,12 +66,19 @@ sudo systemctl start hecate-bootstrap.service - `systemctl start/stop k3s-agent` - UPS telemetry available via NUT (`upsc`) +Optional SSH jump/bastion: +- Set `ssh_jump_host` (and optional `ssh_jump_user`) to route node SSH through a jump host like `titan-jh`; Hecate now falls back to direct SSH automatically if jump routing is unavailable. +- Set `ssh_port`, `ssh_identity_file`, and `ssh_node_hosts` so root-run systemd actions can actually reach node SSH daemons during cold-start recovery. +- Use `ssh_node_users` for per-node username overrides (for example `titan-24: tethys`). +- Use `ssh_managed_nodes` to limit host-level SSH start/stop actions to nodes Hecate can actually authenticate to. + ## Multi-UPS topology Recommended: - `titan-db` runs Hecate as the shutdown coordinator with UPS `Pyrphoros` (`pyrphoros@localhost`). - `tethys` runs Hecate as a peer with UPS `Statera` (`statera@localhost`) and forwards shutdown triggers to `titan-db`. - If forwarding fails, fallback local shutdown can remain enabled. +- Use `coordination.role: coordinator` on `titan-db` and `coordination.role: peer` on `tethys`. ## Config @@ -86,6 +98,7 @@ Power metrics: ## Notes - Default behavior for `startup` and `shutdown` is dry-run unless `--execute` is set. +- Hecate tracks intent in `/var/lib/hecate/intent.json` (`normal`, `startup_in_progress`, `shutting_down`, `shutdown_complete`) to avoid startup/shutdown fighting each other. - `hecate-bootstrap.service` is enabled to run at host boot and perform staged startup automatically. - `HECATE_ENABLE_BOOTSTRAP=1` enables `hecate-bootstrap.service` (recommended on `titan-db`; keep disabled on non-coordinator hosts). - `hecate-update.timer` runs on boot and periodically to pull latest `main` and reinstall Hecate declaratively. diff --git a/cmd/hecate/main.go b/cmd/hecate/main.go index 67a4ddc..fac5f4f 100644 --- a/cmd/hecate/main.go +++ b/cmd/hecate/main.go @@ -8,6 +8,7 @@ import ( "log" "os" "os/signal" + "strings" "syscall" "time" @@ -63,19 +64,38 @@ func runStartup(logger *log.Logger, args []string) error { execute := fs.Bool("execute", false, "Actually execute changes (default dry-run)") forceBranch := fs.String("force-flux-branch", "", "Patch Flux source branch before resume") skipLocalBootstrap := fs.Bool("skip-local-bootstrap", false, "Skip local fallback bootstrap applies") + allowPeerStartup := fs.Bool("allow-peer-startup", false, "Allow startup to run on a peer instance") + allowOnBattery := fs.Bool("allow-on-battery", false, "Allow startup when UPS reports on-battery") + reason := fs.String("reason", "manual-startup", "Startup reason for run history") _ = fs.Parse(args) - _, orch, err := buildOrchestrator(logger, *configPath, !*execute) + cfg, orch, err := buildOrchestrator(logger, *configPath, !*execute) if err != nil { return err } + if *execute { + if cfg.Coordination.Role == "peer" && !*allowPeerStartup { + return fmt.Errorf("startup blocked: this instance is configured as role=peer (use --allow-peer-startup to override)") + } + if cfg.UPS.Enabled && !cfg.Coordination.AllowStartupOnBattery && !*allowOnBattery { + targets, targetErr := buildUPSTargets(cfg) + if targetErr != nil { + return targetErr + } + checkCtx, cancel := context.WithTimeout(context.Background(), 30*time.Second) + defer cancel() + if err := ensureStartupPowerSafe(checkCtx, targets); err != nil { + return err + } + } + } ctx, cancel := context.WithCancel(context.Background()) defer cancel() return orch.Startup(ctx, cluster.StartupOptions{ ForceFluxBranch: *forceBranch, SkipLocalBootstrap: *skipLocalBootstrap, - Reason: "manual-startup", + Reason: *reason, }) } @@ -115,34 +135,9 @@ func runDaemon(logger *log.Logger, args []string) error { if !cfg.UPS.Enabled { return fmt.Errorf("UPS monitoring is disabled in config") } - targets := make([]service.Target, 0, len(cfg.UPS.Targets)+1) - switch cfg.UPS.Provider { - case "nut": - if len(cfg.UPS.Targets) == 0 { - target := cfg.UPS.Target - if target == "" { - return fmt.Errorf("ups target must be set") - } - targets = append(targets, service.Target{ - Name: "primary", - Target: target, - Provider: ups.NewNUTProvider(target), - }) - } else { - for idx, t := range cfg.UPS.Targets { - name := t.Name - if name == "" { - name = fmt.Sprintf("target-%d", idx+1) - } - targets = append(targets, service.Target{ - Name: name, - Target: t.Target, - Provider: ups.NewNUTProvider(t.Target), - }) - } - } - default: - return fmt.Errorf("unsupported UPS provider: %s", cfg.UPS.Provider) + targets, err := buildUPSTargets(cfg) + if err != nil { + return err } d := service.NewDaemon(cfg, orch, targets, logger) @@ -175,10 +170,69 @@ func runStatus(logger *log.Logger, args []string) error { logger.Printf("expected_flux_branch=%s", cfg.ExpectedFluxBranch) logger.Printf("control_planes=%v", cfg.ControlPlanes) logger.Printf("estimated_shutdown_budget_seconds=%d", orch.EstimatedShutdownSeconds()) + intent, intentErr := state.ReadIntent(cfg.State.IntentPath) + if intentErr != nil { + logger.Printf("intent_read_error=%v", intentErr) + } else if intent.State == "" { + logger.Printf("intent=none") + } else { + logger.Printf("intent=%s reason=%q source=%s updated_at=%s", + intent.State, intent.Reason, intent.Source, intent.UpdatedAt.Format(time.RFC3339)) + } logger.Printf("last_run=%s", last) return nil } +func buildUPSTargets(cfg config.Config) ([]service.Target, error) { + targets := make([]service.Target, 0, len(cfg.UPS.Targets)+1) + switch cfg.UPS.Provider { + case "nut": + if len(cfg.UPS.Targets) == 0 { + target := cfg.UPS.Target + if target == "" { + return nil, fmt.Errorf("ups target must be set") + } + targets = append(targets, service.Target{ + Name: "primary", + Target: target, + Provider: ups.NewNUTProvider(target), + }) + } else { + for idx, t := range cfg.UPS.Targets { + name := t.Name + if name == "" { + name = fmt.Sprintf("target-%d", idx+1) + } + targets = append(targets, service.Target{ + Name: name, + Target: t.Target, + Provider: ups.NewNUTProvider(t.Target), + }) + } + } + default: + return nil, fmt.Errorf("unsupported UPS provider: %s", cfg.UPS.Provider) + } + return targets, nil +} + +func ensureStartupPowerSafe(ctx context.Context, targets []service.Target) error { + onBatteryTargets := []string{} + for _, t := range targets { + sample, err := t.Provider.Read(ctx) + if err != nil { + return fmt.Errorf("startup blocked: unable to verify UPS target %s (%s): %w", t.Name, t.Target, err) + } + if sample.OnBattery { + onBatteryTargets = append(onBatteryTargets, fmt.Sprintf("%s(status=%s runtime_s=%d)", t.Name, sample.RawStatus, sample.RuntimeSeconds)) + } + } + if len(onBatteryTargets) > 0 { + return fmt.Errorf("startup blocked: UPS is on battery for %s", strings.Join(onBatteryTargets, ", ")) + } + return nil +} + func buildOrchestrator(logger *log.Logger, cfgPath string, dryRun bool) (config.Config, *cluster.Orchestrator, error) { cfg, err := config.Load(cfgPath) if err != nil { diff --git a/configs/hecate.example.yaml b/configs/hecate.example.yaml index fbcbbbd..391c61c 100644 --- a/configs/hecate.example.yaml +++ b/configs/hecate.example.yaml @@ -1,6 +1,13 @@ # /etc/hecate/hecate.yaml -kubeconfig: /home/atlas/.kube/config +kubeconfig: /etc/hecate/kubeconfig ssh_user: atlas +ssh_port: 2277 +ssh_identity_file: /home/atlas/.ssh/id_ed25519 +ssh_node_hosts: {} +ssh_node_users: {} +ssh_managed_nodes: [] +ssh_jump_host: "" +ssh_jump_user: "" iac_repo_path: /opt/titan-iac expected_flux_branch: main control_planes: @@ -10,7 +17,7 @@ control_planes: workers: [] local_bootstrap_paths: - infrastructure/core - - infrastructure/flux-system + - clusters/atlas/flux-system - infrastructure/sources/helm - infrastructure/metallb - infrastructure/traefik @@ -31,6 +38,9 @@ excluded_namespaces: - vault - postgres - maintenance +startup: + api_wait_seconds: 1200 + api_poll_seconds: 2 shutdown: default_budget_seconds: 300 skip_etcd_snapshot: false @@ -57,6 +67,8 @@ coordination: forward_shutdown_config: /etc/hecate/hecate.yaml fallback_local_shutdown: true command_timeout_seconds: 25 + role: coordinator + allow_startup_on_battery: false metrics: enabled: true bind_addr: 0.0.0.0:9560 @@ -65,3 +77,4 @@ state: dir: /var/lib/hecate run_history_path: /var/lib/hecate/runs.json lock_path: /var/lib/hecate/hecate.lock + intent_path: /var/lib/hecate/intent.json diff --git a/configs/hecate.tethys.yaml b/configs/hecate.tethys.yaml index 519076e..c4ad44e 100644 --- a/configs/hecate.tethys.yaml +++ b/configs/hecate.tethys.yaml @@ -1,6 +1,39 @@ # /etc/hecate/hecate.yaml for titan-24 (tethys forwarder) -kubeconfig: /home/tethys/.kube/config +kubeconfig: /etc/hecate/kubeconfig ssh_user: atlas +ssh_port: 2277 +ssh_identity_file: /home/tethys/.ssh/id_ed25519 +ssh_node_hosts: + titan-db: 192.168.22.7 + titan-0a: 192.168.22.11 + titan-0b: 192.168.22.12 + titan-0c: 192.168.22.13 + titan-04: 192.168.22.30 + titan-05: 192.168.22.31 + titan-06: 192.168.22.32 + titan-07: 192.168.22.33 + titan-08: 192.168.22.34 + titan-09: 192.168.22.35 + titan-10: 192.168.22.36 + titan-11: 192.168.22.37 + titan-12: 192.168.22.40 + titan-13: 192.168.22.41 + titan-14: 192.168.22.42 + titan-15: 192.168.22.43 + titan-17: 192.168.22.45 + titan-18: 192.168.22.46 + titan-19: 192.168.22.47 + titan-20: 192.168.22.20 + titan-21: 192.168.22.21 + titan-22: 192.168.22.22 + titan-24: 192.168.22.26 +ssh_node_users: + titan-24: tethys +ssh_managed_nodes: + - titan-db + - titan-24 +ssh_jump_host: "" +ssh_jump_user: "" iac_repo_path: /opt/titan-iac expected_flux_branch: main control_planes: @@ -22,6 +55,9 @@ excluded_namespaces: - vault - postgres - maintenance +startup: + api_wait_seconds: 1200 + api_poll_seconds: 2 shutdown: default_budget_seconds: 300 skip_etcd_snapshot: false @@ -46,6 +82,8 @@ coordination: forward_shutdown_config: /etc/hecate/hecate.yaml fallback_local_shutdown: false command_timeout_seconds: 25 + role: peer + allow_startup_on_battery: false metrics: enabled: true bind_addr: 0.0.0.0:9560 @@ -54,3 +92,4 @@ state: dir: /var/lib/hecate run_history_path: /var/lib/hecate/runs.json lock_path: /var/lib/hecate/hecate.lock + intent_path: /var/lib/hecate/intent.json diff --git a/configs/hecate.titan-db.yaml b/configs/hecate.titan-db.yaml index 3ed41d2..c9050f6 100644 --- a/configs/hecate.titan-db.yaml +++ b/configs/hecate.titan-db.yaml @@ -1,6 +1,47 @@ # /etc/hecate/hecate.yaml for titan-db (coordinator) -kubeconfig: /home/atlas/.kube/config +kubeconfig: /etc/hecate/kubeconfig ssh_user: atlas +ssh_port: 2277 +ssh_identity_file: /home/atlas/.ssh/id_ed25519 +ssh_node_hosts: + titan-db: 192.168.22.7 + titan-0a: 192.168.22.11 + titan-0b: 192.168.22.12 + titan-0c: 192.168.22.13 + titan-04: 192.168.22.30 + titan-05: 192.168.22.31 + titan-06: 192.168.22.32 + titan-07: 192.168.22.33 + titan-08: 192.168.22.34 + titan-09: 192.168.22.35 + titan-10: 192.168.22.36 + titan-11: 192.168.22.37 + titan-12: 192.168.22.40 + titan-13: 192.168.22.41 + titan-14: 192.168.22.42 + titan-15: 192.168.22.43 + titan-17: 192.168.22.45 + titan-18: 192.168.22.46 + titan-19: 192.168.22.47 + titan-20: 192.168.22.20 + titan-21: 192.168.22.21 + titan-22: 192.168.22.22 + titan-24: 192.168.22.26 +ssh_node_users: + titan-24: tethys +ssh_managed_nodes: + - titan-db + - titan-0a + - titan-0b + - titan-0c + - titan-12 + - titan-14 + - titan-15 + - titan-17 + - titan-18 + - titan-22 +ssh_jump_host: "" +ssh_jump_user: "" iac_repo_path: /opt/titan-iac expected_flux_branch: main control_planes: @@ -10,7 +51,7 @@ control_planes: workers: [] local_bootstrap_paths: - infrastructure/core - - infrastructure/flux-system + - clusters/atlas/flux-system - infrastructure/sources/helm - infrastructure/metallb - infrastructure/traefik @@ -31,6 +72,9 @@ excluded_namespaces: - vault - postgres - maintenance +startup: + api_wait_seconds: 1200 + api_poll_seconds: 2 shutdown: default_budget_seconds: 300 skip_etcd_snapshot: false @@ -56,6 +100,8 @@ coordination: forward_shutdown_config: /etc/hecate/hecate.yaml fallback_local_shutdown: true command_timeout_seconds: 25 + role: coordinator + allow_startup_on_battery: false metrics: enabled: true bind_addr: 0.0.0.0:9560 @@ -64,3 +110,4 @@ state: dir: /var/lib/hecate run_history_path: /var/lib/hecate/runs.json lock_path: /var/lib/hecate/hecate.lock + intent_path: /var/lib/hecate/intent.json diff --git a/internal/cluster/orchestrator.go b/internal/cluster/orchestrator.go index e0b14ab..74effd1 100644 --- a/internal/cluster/orchestrator.go +++ b/internal/cluster/orchestrator.go @@ -73,20 +73,47 @@ func (o *Orchestrator) Startup(ctx context.Context, opts StartupOptions) (err er } defer o.finalizeRecord(&record, &err) + if !o.runner.DryRun { + currentIntent, readErr := state.ReadIntent(o.cfg.State.IntentPath) + if readErr != nil { + return fmt.Errorf("read startup intent: %w", readErr) + } + if currentIntent.State == state.IntentShuttingDown { + return fmt.Errorf("startup blocked: shutdown intent is active (%s)", currentIntent.Reason) + } + if writeErr := state.MustWriteIntent(o.cfg.State.IntentPath, state.IntentStartupInProgress, opts.Reason, "startup"); writeErr != nil { + return fmt.Errorf("set startup intent: %w", writeErr) + } + defer func() { + if err == nil { + if writeErr := state.MustWriteIntent(o.cfg.State.IntentPath, state.IntentNormal, opts.Reason, "startup"); writeErr != nil { + o.log.Printf("warning: write startup completion intent failed: %v", writeErr) + } + } + }() + } + + o.log.Printf("startup control-planes=%s", strings.Join(o.cfg.ControlPlanes, ",")) + + o.reportFluxSource(ctx, opts.ForceFluxBranch) + o.startControlPlanes(ctx, o.cfg.ControlPlanes) + + apiPoll := time.Duration(o.cfg.Startup.APIPollSeconds) * time.Second + apiAttempts := o.cfg.Startup.APIWaitSeconds / o.cfg.Startup.APIPollSeconds + if apiAttempts < 1 { + apiAttempts = 1 + } + if err := o.waitForAPI(ctx, apiAttempts, apiPoll); err != nil { + return err + } + workers, err := o.effectiveWorkers(ctx) if err != nil { return err } - o.log.Printf("startup control-planes=%s workers=%s", strings.Join(o.cfg.ControlPlanes, ","), strings.Join(workers, ",")) - - o.reportFluxSource(ctx, opts.ForceFluxBranch) - o.startControlPlanes(ctx, o.cfg.ControlPlanes) + o.log.Printf("startup workers=%s", strings.Join(workers, ",")) o.startWorkers(ctx, workers) - if err := o.waitForAPI(ctx, 120, 2*time.Second); err != nil { - return err - } - if opts.ForceFluxBranch != "" { patch := fmt.Sprintf(`{"spec":{"ref":{"branch":"%s"}}}`, opts.ForceFluxBranch) if _, err := o.kubectl(ctx, 20*time.Second, "-n", "flux-system", "patch", "gitrepository", "flux-system", "--type=merge", "-p", patch); err != nil { @@ -167,6 +194,20 @@ func (o *Orchestrator) Shutdown(ctx context.Context, opts ShutdownOptions) (err StartedAt: time.Now().UTC(), } defer o.finalizeRecord(&record, &err) + if !o.runner.DryRun { + if writeErr := state.MustWriteIntent(o.cfg.State.IntentPath, state.IntentShuttingDown, opts.Reason, "shutdown"); writeErr != nil { + return fmt.Errorf("set shutdown intent: %w", writeErr) + } + defer func() { + final := state.IntentShuttingDown + if err == nil { + final = state.IntentShutdownComplete + } + if writeErr := state.MustWriteIntent(o.cfg.State.IntentPath, final, opts.Reason, "shutdown"); writeErr != nil { + o.log.Printf("warning: write shutdown completion intent failed: %v", writeErr) + } + }() + } workers, err := o.effectiveWorkers(ctx) if err != nil { @@ -322,6 +363,10 @@ func (o *Orchestrator) drainWorkers(ctx context.Context, workers []string) error func (o *Orchestrator) stopWorkers(ctx context.Context, workers []string) { for _, n := range workers { + if !o.sshManaged(n) { + o.log.Printf("skip stop k3s-agent on %s: node not in ssh_managed_nodes", n) + continue + } o.bestEffort("stop k3s-agent on "+n, func() error { _, err := o.ssh(ctx, n, "sudo systemctl stop k3s-agent || true") return err @@ -331,6 +376,10 @@ func (o *Orchestrator) stopWorkers(ctx context.Context, workers []string) { func (o *Orchestrator) startWorkers(ctx context.Context, workers []string) { for _, n := range workers { + if !o.sshManaged(n) { + o.log.Printf("skip start k3s-agent on %s: node not in ssh_managed_nodes", n) + continue + } o.bestEffort("start k3s-agent on "+n, func() error { _, err := o.ssh(ctx, n, "sudo systemctl start k3s-agent || true") return err @@ -340,6 +389,10 @@ func (o *Orchestrator) startWorkers(ctx context.Context, workers []string) { func (o *Orchestrator) stopControlPlanes(ctx context.Context, cps []string) { for _, n := range cps { + if !o.sshManaged(n) { + o.log.Printf("skip stop k3s on %s: node not in ssh_managed_nodes", n) + continue + } o.bestEffort("stop k3s on "+n, func() error { _, err := o.ssh(ctx, n, "sudo systemctl stop k3s || true") return err @@ -349,6 +402,10 @@ func (o *Orchestrator) stopControlPlanes(ctx context.Context, cps []string) { func (o *Orchestrator) startControlPlanes(ctx context.Context, cps []string) { for _, n := range cps { + if !o.sshManaged(n) { + o.log.Printf("skip start k3s on %s: node not in ssh_managed_nodes", n) + continue + } o.bestEffort("start k3s on "+n, func() error { _, err := o.ssh(ctx, n, "sudo systemctl start k3s || true") return err @@ -357,6 +414,9 @@ func (o *Orchestrator) startControlPlanes(ctx context.Context, cps []string) { } func (o *Orchestrator) takeEtcdSnapshot(ctx context.Context, node string) error { + if !o.sshManaged(node) { + return fmt.Errorf("cannot run etcd snapshot on %s: node not in ssh_managed_nodes", node) + } name := "pre-shutdown-" + time.Now().UTC().Format("20060102-150405") _, err := o.ssh(ctx, node, "sudo k3s etcd-snapshot save --name "+name) return err @@ -404,11 +464,18 @@ func (o *Orchestrator) bootstrapLocal(ctx context.Context) error { for _, rel := range o.cfg.LocalBootstrapPaths { full := filepath.Join(o.cfg.IACRepoPath, rel) o.log.Printf("local bootstrap apply -k %s", full) - if _, err := o.kubectl(ctx, 2*time.Minute, "apply", "-k", full); err != nil { - failures++ - o.log.Printf("warning: local bootstrap apply failed at %s: %v", full, err) + if o.runner.DryRun { continue } + if _, err := o.kubectl(ctx, 2*time.Minute, "apply", "-k", full); err != nil { + o.log.Printf("warning: local bootstrap apply failed at %s: %v", full, err) + o.log.Printf("local bootstrap fallback render/apply with LoadRestrictionsNone for %s", full) + if fallbackErr := o.applyKustomizeFallback(ctx, full); fallbackErr != nil { + failures++ + o.log.Printf("warning: local bootstrap fallback failed at %s: %v", full, fallbackErr) + continue + } + } } if failures == len(o.cfg.LocalBootstrapPaths) { return fmt.Errorf("local bootstrap apply failed for every configured path (%d total)", failures) @@ -416,6 +483,14 @@ func (o *Orchestrator) bootstrapLocal(ctx context.Context) error { return nil } +func (o *Orchestrator) applyKustomizeFallback(ctx context.Context, full string) error { + cmd := fmt.Sprintf("kubectl kustomize --load-restrictor=LoadRestrictionsNone %q | kubectl apply -f -", full) + if _, err := o.runSensitive(ctx, 3*time.Minute, "sh", "-lc", cmd); err != nil { + return err + } + return nil +} + func (o *Orchestrator) waitForFluxSourceReady(ctx context.Context, window time.Duration) (bool, error) { if o.runner.DryRun { return true, nil @@ -485,11 +560,66 @@ func (o *Orchestrator) kubectl(ctx context.Context, timeout time.Duration, args } func (o *Orchestrator) ssh(ctx context.Context, node string, command string) (string, error) { - target := node - if o.cfg.SSHUser != "" { - target = o.cfg.SSHUser + "@" + node + host := node + if mapped, ok := o.cfg.SSHNodeHosts[node]; ok && strings.TrimSpace(mapped) != "" { + host = strings.TrimSpace(mapped) } - return o.run(ctx, 45*time.Second, "ssh", "-o", "BatchMode=yes", "-o", "ConnectTimeout=8", target, command) + sshUser := o.cfg.SSHUser + if override, ok := o.cfg.SSHNodeUsers[node]; ok && strings.TrimSpace(override) != "" { + sshUser = strings.TrimSpace(override) + } + target := host + if sshUser != "" { + target = sshUser + "@" + host + } + baseArgs := []string{ + "-o", "BatchMode=yes", + "-o", "ConnectTimeout=8", + "-o", "StrictHostKeyChecking=accept-new", + } + if o.cfg.SSHIdentityFile != "" { + baseArgs = append(baseArgs, "-i", o.cfg.SSHIdentityFile) + } + if o.cfg.SSHPort > 0 { + baseArgs = append(baseArgs, "-p", strconv.Itoa(o.cfg.SSHPort)) + } + attempts := make([][]string, 0, 2) + attemptNames := make([]string, 0, 2) + if o.cfg.SSHJumpHost != "" { + jump := o.cfg.SSHJumpHost + if o.cfg.SSHJumpUser != "" { + jump = o.cfg.SSHJumpUser + "@" + jump + } + if o.cfg.SSHPort > 0 && !strings.Contains(jump, ":") { + jump = fmt.Sprintf("%s:%d", jump, o.cfg.SSHPort) + } + withJump := append([]string{}, baseArgs...) + withJump = append(withJump, "-J", jump, target, command) + attempts = append(attempts, withJump) + attemptNames = append(attemptNames, "jump") + } + direct := append([]string{}, baseArgs...) + direct = append(direct, target, command) + attempts = append(attempts, direct) + attemptNames = append(attemptNames, "direct") + + var lastOut string + var lastErr error + for i, args := range attempts { + out, err := o.run(ctx, 45*time.Second, "ssh", args...) + if err == nil { + if i > 0 { + o.log.Printf("warning: ssh %s path failed for %s, using %s path", attemptNames[i-1], node, attemptNames[i]) + } + return out, nil + } + lastOut = out + lastErr = err + if i < len(attempts)-1 { + o.log.Printf("warning: ssh %s path failed for %s: %v; trying %s path", attemptNames[i], node, err, attemptNames[i+1]) + } + } + return lastOut, lastErr } func (o *Orchestrator) run(ctx context.Context, timeout time.Duration, name string, args ...string) (string, error) { @@ -534,6 +664,18 @@ func lines(in string) []string { return out } +func (o *Orchestrator) sshManaged(node string) bool { + if len(o.cfg.SSHManagedNodes) == 0 { + return true + } + for _, allowed := range o.cfg.SSHManagedNodes { + if strings.TrimSpace(allowed) == node { + return true + } + } + return false +} + func (o *Orchestrator) bestEffort(name string, fn func() error) { if err := fn(); err != nil { o.log.Printf("warning: %s: %v", name, err) diff --git a/internal/config/config.go b/internal/config/config.go index 26d75a1..09d0d65 100644 --- a/internal/config/config.go +++ b/internal/config/config.go @@ -8,19 +8,32 @@ import ( ) type Config struct { - Kubeconfig string `yaml:"kubeconfig"` - SSHUser string `yaml:"ssh_user"` - IACRepoPath string `yaml:"iac_repo_path"` - ExpectedFluxBranch string `yaml:"expected_flux_branch"` - ControlPlanes []string `yaml:"control_planes"` - Workers []string `yaml:"workers"` - LocalBootstrapPaths []string `yaml:"local_bootstrap_paths"` - ExcludedNamespaces []string `yaml:"excluded_namespaces"` - Shutdown Shutdown `yaml:"shutdown"` - UPS UPS `yaml:"ups"` - Coordination Coordination `yaml:"coordination"` - Metrics Metrics `yaml:"metrics"` - State State `yaml:"state"` + Kubeconfig string `yaml:"kubeconfig"` + SSHUser string `yaml:"ssh_user"` + SSHPort int `yaml:"ssh_port"` + SSHIdentityFile string `yaml:"ssh_identity_file"` + SSHNodeHosts map[string]string `yaml:"ssh_node_hosts"` + SSHNodeUsers map[string]string `yaml:"ssh_node_users"` + SSHManagedNodes []string `yaml:"ssh_managed_nodes"` + SSHJumpHost string `yaml:"ssh_jump_host"` + SSHJumpUser string `yaml:"ssh_jump_user"` + IACRepoPath string `yaml:"iac_repo_path"` + ExpectedFluxBranch string `yaml:"expected_flux_branch"` + ControlPlanes []string `yaml:"control_planes"` + Workers []string `yaml:"workers"` + LocalBootstrapPaths []string `yaml:"local_bootstrap_paths"` + ExcludedNamespaces []string `yaml:"excluded_namespaces"` + Startup Startup `yaml:"startup"` + Shutdown Shutdown `yaml:"shutdown"` + UPS UPS `yaml:"ups"` + Coordination Coordination `yaml:"coordination"` + Metrics Metrics `yaml:"metrics"` + State State `yaml:"state"` +} + +type Startup struct { + APIWaitSeconds int `yaml:"api_wait_seconds"` + APIPollSeconds int `yaml:"api_poll_seconds"` } type Shutdown struct { @@ -55,6 +68,8 @@ type Coordination struct { ForwardShutdownConfig string `yaml:"forward_shutdown_config"` FallbackLocalShutdown bool `yaml:"fallback_local_shutdown"` CommandTimeoutSeconds int `yaml:"command_timeout_seconds"` + Role string `yaml:"role"` + AllowStartupOnBattery bool `yaml:"allow_startup_on_battery"` } type Metrics struct { @@ -67,6 +82,7 @@ type State struct { Dir string `yaml:"dir"` RunHistoryPath string `yaml:"run_history_path"` LockPath string `yaml:"lock_path"` + IntentPath string `yaml:"intent_path"` } func Load(path string) (Config, error) { @@ -100,6 +116,15 @@ func (c Config) Validate() error { if c.Shutdown.DefaultBudgetSeconds <= 0 { return fmt.Errorf("config.shutdown.default_budget_seconds must be > 0") } + if c.Startup.APIWaitSeconds <= 0 { + return fmt.Errorf("config.startup.api_wait_seconds must be > 0") + } + if c.Startup.APIPollSeconds <= 0 { + return fmt.Errorf("config.startup.api_poll_seconds must be > 0") + } + if c.SSHPort <= 0 || c.SSHPort > 65535 { + return fmt.Errorf("config.ssh_port must be in range 1-65535") + } if c.UPS.Enabled { if c.UPS.Provider == "" { return fmt.Errorf("config.ups.provider must not be empty when ups is enabled") @@ -118,9 +143,15 @@ func (c Config) Validate() error { return fmt.Errorf("config.coordination.forward_shutdown_config must not be empty when forward_shutdown_host is set") } } + if c.Coordination.Role != "coordinator" && c.Coordination.Role != "peer" { + return fmt.Errorf("config.coordination.role must be coordinator or peer") + } if c.State.RunHistoryPath == "" || c.State.LockPath == "" { return fmt.Errorf("config.state.run_history_path and config.state.lock_path must not be empty") } + if c.State.IntentPath == "" { + return fmt.Errorf("config.state.intent_path must not be empty") + } return nil } @@ -128,10 +159,11 @@ func defaults() Config { c := Config{ IACRepoPath: "/opt/titan-iac", ExpectedFluxBranch: "main", + SSHPort: 2277, ControlPlanes: []string{"titan-0a", "titan-0b", "titan-0c"}, LocalBootstrapPaths: []string{ "infrastructure/core", - "infrastructure/flux-system", + "clusters/atlas/flux-system", "infrastructure/sources/helm", "infrastructure/metallb", "infrastructure/traefik", @@ -154,6 +186,10 @@ func defaults() Config { "postgres", "maintenance", }, + Startup: Startup{ + APIWaitSeconds: 1200, + APIPollSeconds: 2, + }, Shutdown: Shutdown{ DefaultBudgetSeconds: 300, PoweroffEnabled: true, @@ -172,6 +208,8 @@ func defaults() Config { ForwardShutdownConfig: "/etc/hecate/hecate.yaml", FallbackLocalShutdown: true, CommandTimeoutSeconds: 25, + Role: "coordinator", + AllowStartupOnBattery: false, }, Metrics: Metrics{ Enabled: true, @@ -182,6 +220,7 @@ func defaults() Config { Dir: "/var/lib/hecate", RunHistoryPath: "/var/lib/hecate/runs.json", LockPath: "/var/lib/hecate/hecate.lock", + IntentPath: "/var/lib/hecate/intent.json", }, } c.applyDefaults() @@ -195,6 +234,15 @@ func (c *Config) applyDefaults() { if c.IACRepoPath == "" { c.IACRepoPath = "/opt/titan-iac" } + if c.Startup.APIWaitSeconds <= 0 { + c.Startup.APIWaitSeconds = 1200 + } + if c.Startup.APIPollSeconds <= 0 { + c.Startup.APIPollSeconds = 2 + } + if c.SSHPort <= 0 { + c.SSHPort = 2277 + } if c.Shutdown.DefaultBudgetSeconds <= 0 { c.Shutdown.DefaultBudgetSeconds = 300 } @@ -219,6 +267,9 @@ func (c *Config) applyDefaults() { if c.Coordination.CommandTimeoutSeconds <= 0 { c.Coordination.CommandTimeoutSeconds = 25 } + if c.Coordination.Role == "" { + c.Coordination.Role = "coordinator" + } if c.Metrics.BindAddr == "" { c.Metrics.BindAddr = "0.0.0.0:9560" } @@ -234,4 +285,7 @@ func (c *Config) applyDefaults() { if c.State.LockPath == "" { c.State.LockPath = "/var/lib/hecate/hecate.lock" } + if c.State.IntentPath == "" { + c.State.IntentPath = "/var/lib/hecate/intent.json" + } } diff --git a/internal/config/config_test.go b/internal/config/config_test.go index 29e8e3b..bce7fd9 100644 --- a/internal/config/config_test.go +++ b/internal/config/config_test.go @@ -47,3 +47,11 @@ func TestValidateForwardShutdownRequiresConfigPath(t *testing.T) { t.Fatalf("expected validation error for missing forward_shutdown_config") } } + +func TestValidateRejectsUnknownRole(t *testing.T) { + cfg := defaults() + cfg.Coordination.Role = "unknown" + if err := cfg.Validate(); err == nil { + t.Fatalf("expected validation error for unknown coordination role") + } +} diff --git a/internal/service/daemon.go b/internal/service/daemon.go index 4e648d1..5f836fa 100644 --- a/internal/service/daemon.go +++ b/internal/service/daemon.go @@ -7,12 +7,14 @@ import ( "math" "net/http" "os/exec" + "strconv" "strings" "time" "scm.bstein.dev/bstein/hecate/internal/cluster" "scm.bstein.dev/bstein/hecate/internal/config" "scm.bstein.dev/bstein/hecate/internal/metrics" + "scm.bstein.dev/bstein/hecate/internal/state" "scm.bstein.dev/bstein/hecate/internal/ups" ) @@ -149,10 +151,22 @@ func (d *Daemon) Run(ctx context.Context) error { } func (d *Daemon) triggerShutdown(ctx context.Context, reason string) error { + intent, err := state.ReadIntent(d.cfg.State.IntentPath) + if err == nil && intent.State == state.IntentShuttingDown { + d.log.Printf("shutdown already in progress; skipping duplicate trigger: %s", reason) + return nil + } + if err := state.MustWriteIntent(d.cfg.State.IntentPath, state.IntentShuttingDown, reason, "daemon"); err != nil { + d.log.Printf("warning: unable to persist shutdown intent before trigger: %v", err) + } + d.log.Printf("triggering shutdown: %s", reason) d.exporter.MarkShutdown(reason) if d.cfg.Coordination.ForwardShutdownHost != "" { if err := d.forwardShutdown(ctx, reason); err == nil { + if setErr := state.MustWriteIntent(d.cfg.State.IntentPath, state.IntentShutdownComplete, reason, "daemon-forwarded"); setErr != nil { + d.log.Printf("warning: unable to persist forwarded shutdown completion intent: %v", setErr) + } d.log.Printf("shutdown trigger forwarded to %s", d.cfg.Coordination.ForwardShutdownHost) return nil } else if !d.cfg.Coordination.FallbackLocalShutdown { @@ -161,14 +175,16 @@ func (d *Daemon) triggerShutdown(ctx context.Context, reason string) error { d.log.Printf("warning: forward shutdown failed; falling back to local shutdown: %v", err) } } - return d.orch.Shutdown(ctx, cluster.ShutdownOptions{Reason: reason}) + if err := d.orch.Shutdown(ctx, cluster.ShutdownOptions{Reason: reason}); err != nil { + return err + } + if setErr := state.MustWriteIntent(d.cfg.State.IntentPath, state.IntentShutdownComplete, reason, "daemon-local"); setErr != nil { + d.log.Printf("warning: unable to persist local shutdown completion intent: %v", setErr) + } + return nil } func (d *Daemon) forwardShutdown(ctx context.Context, reason string) error { - userHost := d.cfg.Coordination.ForwardShutdownHost - if d.cfg.Coordination.ForwardShutdownUser != "" { - userHost = d.cfg.Coordination.ForwardShutdownUser + "@" + userHost - } timeout := time.Duration(d.cfg.Coordination.CommandTimeoutSeconds) * time.Second if timeout <= 0 { timeout = 25 * time.Second @@ -181,7 +197,46 @@ func (d *Daemon) forwardShutdown(ctx context.Context, reason string) error { d.cfg.Coordination.ForwardShutdownConfig, reason, ) - cmd := exec.CommandContext(runCtx, "ssh", "-o", "BatchMode=yes", "-o", "ConnectTimeout=8", userHost, remoteCmd) + host := d.cfg.Coordination.ForwardShutdownHost + if mapped, ok := d.cfg.SSHNodeHosts[host]; ok && strings.TrimSpace(mapped) != "" { + host = strings.TrimSpace(mapped) + } + user := d.cfg.Coordination.ForwardShutdownUser + if user == "" { + if override, ok := d.cfg.SSHNodeUsers[d.cfg.Coordination.ForwardShutdownHost]; ok && strings.TrimSpace(override) != "" { + user = strings.TrimSpace(override) + } else { + user = d.cfg.SSHUser + } + } + + target := host + if user != "" { + target = user + "@" + host + } + args := []string{ + "-o", "BatchMode=yes", + "-o", "ConnectTimeout=8", + "-o", "StrictHostKeyChecking=accept-new", + } + if d.cfg.SSHIdentityFile != "" { + args = append(args, "-i", d.cfg.SSHIdentityFile) + } + if d.cfg.SSHPort > 0 { + args = append(args, "-p", strconv.Itoa(d.cfg.SSHPort)) + } + if d.cfg.SSHJumpHost != "" { + jump := d.cfg.SSHJumpHost + if d.cfg.SSHJumpUser != "" { + jump = d.cfg.SSHJumpUser + "@" + jump + } + if d.cfg.SSHPort > 0 && !strings.Contains(jump, ":") { + jump = fmt.Sprintf("%s:%d", jump, d.cfg.SSHPort) + } + args = append(args, "-J", jump) + } + args = append(args, target, remoteCmd) + cmd := exec.CommandContext(runCtx, "ssh", args...) out, err := cmd.CombinedOutput() if err != nil { trimmed := strings.TrimSpace(string(out)) diff --git a/internal/state/intent.go b/internal/state/intent.go new file mode 100644 index 0000000..f479d51 --- /dev/null +++ b/internal/state/intent.go @@ -0,0 +1,69 @@ +package state + +import ( + "encoding/json" + "fmt" + "os" + "path/filepath" + "time" +) + +const ( + IntentNormal = "normal" + IntentStartupInProgress = "startup_in_progress" + IntentShuttingDown = "shutting_down" + IntentShutdownComplete = "shutdown_complete" +) + +type Intent struct { + State string `json:"state"` + Reason string `json:"reason,omitempty"` + Source string `json:"source,omitempty"` + UpdatedAt time.Time `json:"updated_at"` +} + +func ReadIntent(path string) (Intent, error) { + b, err := os.ReadFile(path) + if err != nil { + if os.IsNotExist(err) { + return Intent{}, nil + } + return Intent{}, err + } + if len(b) == 0 { + return Intent{}, nil + } + var in Intent + if err := json.Unmarshal(b, &in); err != nil { + return Intent{}, err + } + return in, nil +} + +func WriteIntent(path string, in Intent) error { + if in.UpdatedAt.IsZero() { + in.UpdatedAt = time.Now().UTC() + } + if err := os.MkdirAll(filepath.Dir(path), 0o750); err != nil { + return err + } + b, err := json.MarshalIndent(in, "", " ") + if err != nil { + return err + } + return os.WriteFile(path, b, 0o640) +} + +func MustWriteIntent(path string, state string, reason string, source string) error { + switch state { + case IntentNormal, IntentStartupInProgress, IntentShuttingDown, IntentShutdownComplete: + default: + return fmt.Errorf("invalid intent state: %s", state) + } + return WriteIntent(path, Intent{ + State: state, + Reason: reason, + Source: source, + UpdatedAt: time.Now().UTC(), + }) +} diff --git a/internal/state/intent_test.go b/internal/state/intent_test.go new file mode 100644 index 0000000..70886a5 --- /dev/null +++ b/internal/state/intent_test.go @@ -0,0 +1,30 @@ +package state + +import ( + "path/filepath" + "testing" +) + +func TestWriteReadIntentRoundTrip(t *testing.T) { + p := filepath.Join(t.TempDir(), "intent.json") + if err := MustWriteIntent(p, IntentShuttingDown, "ups-threshold", "daemon"); err != nil { + t.Fatalf("write intent: %v", err) + } + in, err := ReadIntent(p) + if err != nil { + t.Fatalf("read intent: %v", err) + } + if in.State != IntentShuttingDown { + t.Fatalf("expected state %q, got %q", IntentShuttingDown, in.State) + } + if in.Source != "daemon" { + t.Fatalf("expected source daemon, got %q", in.Source) + } +} + +func TestMustWriteIntentRejectsUnknownState(t *testing.T) { + p := filepath.Join(t.TempDir(), "intent.json") + if err := MustWriteIntent(p, "weird", "x", "y"); err == nil { + t.Fatalf("expected invalid state error") + } +} diff --git a/scripts/hecate-drills.sh b/scripts/hecate-drills.sh index e1f8f91..0e2ba11 100755 --- a/scripts/hecate-drills.sh +++ b/scripts/hecate-drills.sh @@ -19,6 +19,7 @@ Drills: flux-gitea-deadlock Simulate flux-controller + gitea outage and require startup recovery. foundation-recovery Simulate vault/postgres/gitea outage and require layered restore. reconciliation-resume Simulate global Flux suspend + source-controller down and require resume. + startup-intent-guard Assert startup is blocked when shutdown intent is active. Notes: - Drills are intentionally disruptive and are not part of regular `make test`. @@ -73,7 +74,7 @@ wait_ready() { run_hecate_startup() { local reason="$1" - local cmd=(sudo "${HECATE_BIN}" startup --config "${HECATE_CONFIG}" --execute --force-flux-branch main) + local cmd=(sudo "${HECATE_BIN}" startup --config "${HECATE_CONFIG}" --execute --force-flux-branch main --reason "${reason}") if [[ "${EXECUTE}" -eq 0 ]]; then log "plan: ssh ${HECATE_COORDINATOR_HOST} '${cmd[*]}'" return 0 @@ -272,6 +273,41 @@ run_drill_reconciliation_resume() { ROLLBACK_FLUX_SUSPEND=0 } +run_drill_startup_intent_guard() { + local intent_path="/var/lib/hecate/intent.json" + local backup_path="/tmp/hecate-intent-pre-drill.json" + local inject_cmd=" +if [ -f '${intent_path}' ]; then sudo cp '${intent_path}' '${backup_path}'; else sudo rm -f '${backup_path}'; fi +cat <<'JSON' | sudo tee '${intent_path}' >/dev/null +{\"state\":\"shutting_down\",\"reason\":\"drill-intent-guard\",\"source\":\"drill\",\"updated_at\":\"$(date -u +%Y-%m-%dT%H:%M:%SZ)\"} +JSON +" + local restore_cmd=" +if [ -f '${backup_path}' ]; then + sudo mv '${backup_path}' '${intent_path}' +else + sudo rm -f '${intent_path}' +fi +" + local startup_cmd="sudo ${HECATE_BIN} startup --config ${HECATE_CONFIG} --execute --force-flux-branch main --reason drill-startup-intent-guard" + + if [[ "${EXECUTE}" -eq 0 ]]; then + log "plan: ssh ${HECATE_COORDINATOR_HOST} ''" + log "plan: ssh ${HECATE_COORDINATOR_HOST} '${startup_cmd}' (expect failure)" + log "plan: ssh ${HECATE_COORDINATOR_HOST} ''" + log "pass: startup-intent-guard (plan mode)" + return 0 + fi + + ssh "${HECATE_COORDINATOR_HOST}" "bash -lc ${inject_cmd@Q}" + if ssh "${HECATE_COORDINATOR_HOST}" "bash -lc ${startup_cmd@Q}"; then + ssh "${HECATE_COORDINATOR_HOST}" "bash -lc ${restore_cmd@Q}" || true + die "startup-intent-guard failed: startup unexpectedly succeeded while shutdown intent was active" + fi + ssh "${HECATE_COORDINATOR_HOST}" "bash -lc ${restore_cmd@Q}" + log "pass: startup-intent-guard" +} + main() { need_cmd "${KUBECTL}" need_cmd ssh @@ -315,6 +351,9 @@ main() { reconciliation-resume) run_drill_reconciliation_resume ;; + startup-intent-guard) + run_drill_startup_intent_guard + ;; *) die "unknown drill: ${drill}" ;;