hecate(startup): add coordinated intent guards and resilient recovery ssh

This commit is contained in:
Brad Stein 2026-04-04 12:44:15 -03:00
parent 4c985000a8
commit 3af6fe9f6f
12 changed files with 634 additions and 71 deletions

View File

@ -21,6 +21,11 @@ Hecate runs outside the cluster under systemd, so it can always orchestrate brin
- `hecate daemon --config /etc/hecate/hecate.yaml`
- `hecate status --config /etc/hecate/hecate.yaml`
Key startup guards:
- Startup is blocked on hosts configured as `coordination.role: peer` (unless `--allow-peer-startup` is used intentionally).
- Startup is blocked while UPS is on battery by default (unless `--allow-on-battery` or `coordination.allow_startup_on_battery: true` is set).
- Startup is blocked when a shutdown intent is active (`/var/lib/hecate/intent.json`).
## Manual install on titan-db
```bash
@ -61,12 +66,19 @@ sudo systemctl start hecate-bootstrap.service
- `systemctl start/stop k3s-agent`
- UPS telemetry available via NUT (`upsc`)
Optional SSH jump/bastion:
- Set `ssh_jump_host` (and optional `ssh_jump_user`) to route node SSH through a jump host like `titan-jh`; Hecate now falls back to direct SSH automatically if jump routing is unavailable.
- Set `ssh_port`, `ssh_identity_file`, and `ssh_node_hosts` so root-run systemd actions can actually reach node SSH daemons during cold-start recovery.
- Use `ssh_node_users` for per-node username overrides (for example `titan-24: tethys`).
- Use `ssh_managed_nodes` to limit host-level SSH start/stop actions to nodes Hecate can actually authenticate to.
## Multi-UPS topology
Recommended:
- `titan-db` runs Hecate as the shutdown coordinator with UPS `Pyrphoros` (`pyrphoros@localhost`).
- `tethys` runs Hecate as a peer with UPS `Statera` (`statera@localhost`) and forwards shutdown triggers to `titan-db`.
- If forwarding fails, fallback local shutdown can remain enabled.
- Use `coordination.role: coordinator` on `titan-db` and `coordination.role: peer` on `tethys`.
## Config
@ -86,6 +98,7 @@ Power metrics:
## Notes
- Default behavior for `startup` and `shutdown` is dry-run unless `--execute` is set.
- Hecate tracks intent in `/var/lib/hecate/intent.json` (`normal`, `startup_in_progress`, `shutting_down`, `shutdown_complete`) to avoid startup/shutdown fighting each other.
- `hecate-bootstrap.service` is enabled to run at host boot and perform staged startup automatically.
- `HECATE_ENABLE_BOOTSTRAP=1` enables `hecate-bootstrap.service` (recommended on `titan-db`; keep disabled on non-coordinator hosts).
- `hecate-update.timer` runs on boot and periodically to pull latest `main` and reinstall Hecate declaratively.

View File

@ -8,6 +8,7 @@ import (
"log"
"os"
"os/signal"
"strings"
"syscall"
"time"
@ -63,19 +64,38 @@ func runStartup(logger *log.Logger, args []string) error {
execute := fs.Bool("execute", false, "Actually execute changes (default dry-run)")
forceBranch := fs.String("force-flux-branch", "", "Patch Flux source branch before resume")
skipLocalBootstrap := fs.Bool("skip-local-bootstrap", false, "Skip local fallback bootstrap applies")
allowPeerStartup := fs.Bool("allow-peer-startup", false, "Allow startup to run on a peer instance")
allowOnBattery := fs.Bool("allow-on-battery", false, "Allow startup when UPS reports on-battery")
reason := fs.String("reason", "manual-startup", "Startup reason for run history")
_ = fs.Parse(args)
_, orch, err := buildOrchestrator(logger, *configPath, !*execute)
cfg, orch, err := buildOrchestrator(logger, *configPath, !*execute)
if err != nil {
return err
}
if *execute {
if cfg.Coordination.Role == "peer" && !*allowPeerStartup {
return fmt.Errorf("startup blocked: this instance is configured as role=peer (use --allow-peer-startup to override)")
}
if cfg.UPS.Enabled && !cfg.Coordination.AllowStartupOnBattery && !*allowOnBattery {
targets, targetErr := buildUPSTargets(cfg)
if targetErr != nil {
return targetErr
}
checkCtx, cancel := context.WithTimeout(context.Background(), 30*time.Second)
defer cancel()
if err := ensureStartupPowerSafe(checkCtx, targets); err != nil {
return err
}
}
}
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
return orch.Startup(ctx, cluster.StartupOptions{
ForceFluxBranch: *forceBranch,
SkipLocalBootstrap: *skipLocalBootstrap,
Reason: "manual-startup",
Reason: *reason,
})
}
@ -115,34 +135,9 @@ func runDaemon(logger *log.Logger, args []string) error {
if !cfg.UPS.Enabled {
return fmt.Errorf("UPS monitoring is disabled in config")
}
targets := make([]service.Target, 0, len(cfg.UPS.Targets)+1)
switch cfg.UPS.Provider {
case "nut":
if len(cfg.UPS.Targets) == 0 {
target := cfg.UPS.Target
if target == "" {
return fmt.Errorf("ups target must be set")
}
targets = append(targets, service.Target{
Name: "primary",
Target: target,
Provider: ups.NewNUTProvider(target),
})
} else {
for idx, t := range cfg.UPS.Targets {
name := t.Name
if name == "" {
name = fmt.Sprintf("target-%d", idx+1)
}
targets = append(targets, service.Target{
Name: name,
Target: t.Target,
Provider: ups.NewNUTProvider(t.Target),
})
}
}
default:
return fmt.Errorf("unsupported UPS provider: %s", cfg.UPS.Provider)
targets, err := buildUPSTargets(cfg)
if err != nil {
return err
}
d := service.NewDaemon(cfg, orch, targets, logger)
@ -175,10 +170,69 @@ func runStatus(logger *log.Logger, args []string) error {
logger.Printf("expected_flux_branch=%s", cfg.ExpectedFluxBranch)
logger.Printf("control_planes=%v", cfg.ControlPlanes)
logger.Printf("estimated_shutdown_budget_seconds=%d", orch.EstimatedShutdownSeconds())
intent, intentErr := state.ReadIntent(cfg.State.IntentPath)
if intentErr != nil {
logger.Printf("intent_read_error=%v", intentErr)
} else if intent.State == "" {
logger.Printf("intent=none")
} else {
logger.Printf("intent=%s reason=%q source=%s updated_at=%s",
intent.State, intent.Reason, intent.Source, intent.UpdatedAt.Format(time.RFC3339))
}
logger.Printf("last_run=%s", last)
return nil
}
func buildUPSTargets(cfg config.Config) ([]service.Target, error) {
targets := make([]service.Target, 0, len(cfg.UPS.Targets)+1)
switch cfg.UPS.Provider {
case "nut":
if len(cfg.UPS.Targets) == 0 {
target := cfg.UPS.Target
if target == "" {
return nil, fmt.Errorf("ups target must be set")
}
targets = append(targets, service.Target{
Name: "primary",
Target: target,
Provider: ups.NewNUTProvider(target),
})
} else {
for idx, t := range cfg.UPS.Targets {
name := t.Name
if name == "" {
name = fmt.Sprintf("target-%d", idx+1)
}
targets = append(targets, service.Target{
Name: name,
Target: t.Target,
Provider: ups.NewNUTProvider(t.Target),
})
}
}
default:
return nil, fmt.Errorf("unsupported UPS provider: %s", cfg.UPS.Provider)
}
return targets, nil
}
func ensureStartupPowerSafe(ctx context.Context, targets []service.Target) error {
onBatteryTargets := []string{}
for _, t := range targets {
sample, err := t.Provider.Read(ctx)
if err != nil {
return fmt.Errorf("startup blocked: unable to verify UPS target %s (%s): %w", t.Name, t.Target, err)
}
if sample.OnBattery {
onBatteryTargets = append(onBatteryTargets, fmt.Sprintf("%s(status=%s runtime_s=%d)", t.Name, sample.RawStatus, sample.RuntimeSeconds))
}
}
if len(onBatteryTargets) > 0 {
return fmt.Errorf("startup blocked: UPS is on battery for %s", strings.Join(onBatteryTargets, ", "))
}
return nil
}
func buildOrchestrator(logger *log.Logger, cfgPath string, dryRun bool) (config.Config, *cluster.Orchestrator, error) {
cfg, err := config.Load(cfgPath)
if err != nil {

View File

@ -1,6 +1,13 @@
# /etc/hecate/hecate.yaml
kubeconfig: /home/atlas/.kube/config
kubeconfig: /etc/hecate/kubeconfig
ssh_user: atlas
ssh_port: 2277
ssh_identity_file: /home/atlas/.ssh/id_ed25519
ssh_node_hosts: {}
ssh_node_users: {}
ssh_managed_nodes: []
ssh_jump_host: ""
ssh_jump_user: ""
iac_repo_path: /opt/titan-iac
expected_flux_branch: main
control_planes:
@ -10,7 +17,7 @@ control_planes:
workers: []
local_bootstrap_paths:
- infrastructure/core
- infrastructure/flux-system
- clusters/atlas/flux-system
- infrastructure/sources/helm
- infrastructure/metallb
- infrastructure/traefik
@ -31,6 +38,9 @@ excluded_namespaces:
- vault
- postgres
- maintenance
startup:
api_wait_seconds: 1200
api_poll_seconds: 2
shutdown:
default_budget_seconds: 300
skip_etcd_snapshot: false
@ -57,6 +67,8 @@ coordination:
forward_shutdown_config: /etc/hecate/hecate.yaml
fallback_local_shutdown: true
command_timeout_seconds: 25
role: coordinator
allow_startup_on_battery: false
metrics:
enabled: true
bind_addr: 0.0.0.0:9560
@ -65,3 +77,4 @@ state:
dir: /var/lib/hecate
run_history_path: /var/lib/hecate/runs.json
lock_path: /var/lib/hecate/hecate.lock
intent_path: /var/lib/hecate/intent.json

View File

@ -1,6 +1,39 @@
# /etc/hecate/hecate.yaml for titan-24 (tethys forwarder)
kubeconfig: /home/tethys/.kube/config
kubeconfig: /etc/hecate/kubeconfig
ssh_user: atlas
ssh_port: 2277
ssh_identity_file: /home/tethys/.ssh/id_ed25519
ssh_node_hosts:
titan-db: 192.168.22.7
titan-0a: 192.168.22.11
titan-0b: 192.168.22.12
titan-0c: 192.168.22.13
titan-04: 192.168.22.30
titan-05: 192.168.22.31
titan-06: 192.168.22.32
titan-07: 192.168.22.33
titan-08: 192.168.22.34
titan-09: 192.168.22.35
titan-10: 192.168.22.36
titan-11: 192.168.22.37
titan-12: 192.168.22.40
titan-13: 192.168.22.41
titan-14: 192.168.22.42
titan-15: 192.168.22.43
titan-17: 192.168.22.45
titan-18: 192.168.22.46
titan-19: 192.168.22.47
titan-20: 192.168.22.20
titan-21: 192.168.22.21
titan-22: 192.168.22.22
titan-24: 192.168.22.26
ssh_node_users:
titan-24: tethys
ssh_managed_nodes:
- titan-db
- titan-24
ssh_jump_host: ""
ssh_jump_user: ""
iac_repo_path: /opt/titan-iac
expected_flux_branch: main
control_planes:
@ -22,6 +55,9 @@ excluded_namespaces:
- vault
- postgres
- maintenance
startup:
api_wait_seconds: 1200
api_poll_seconds: 2
shutdown:
default_budget_seconds: 300
skip_etcd_snapshot: false
@ -46,6 +82,8 @@ coordination:
forward_shutdown_config: /etc/hecate/hecate.yaml
fallback_local_shutdown: false
command_timeout_seconds: 25
role: peer
allow_startup_on_battery: false
metrics:
enabled: true
bind_addr: 0.0.0.0:9560
@ -54,3 +92,4 @@ state:
dir: /var/lib/hecate
run_history_path: /var/lib/hecate/runs.json
lock_path: /var/lib/hecate/hecate.lock
intent_path: /var/lib/hecate/intent.json

View File

@ -1,6 +1,47 @@
# /etc/hecate/hecate.yaml for titan-db (coordinator)
kubeconfig: /home/atlas/.kube/config
kubeconfig: /etc/hecate/kubeconfig
ssh_user: atlas
ssh_port: 2277
ssh_identity_file: /home/atlas/.ssh/id_ed25519
ssh_node_hosts:
titan-db: 192.168.22.7
titan-0a: 192.168.22.11
titan-0b: 192.168.22.12
titan-0c: 192.168.22.13
titan-04: 192.168.22.30
titan-05: 192.168.22.31
titan-06: 192.168.22.32
titan-07: 192.168.22.33
titan-08: 192.168.22.34
titan-09: 192.168.22.35
titan-10: 192.168.22.36
titan-11: 192.168.22.37
titan-12: 192.168.22.40
titan-13: 192.168.22.41
titan-14: 192.168.22.42
titan-15: 192.168.22.43
titan-17: 192.168.22.45
titan-18: 192.168.22.46
titan-19: 192.168.22.47
titan-20: 192.168.22.20
titan-21: 192.168.22.21
titan-22: 192.168.22.22
titan-24: 192.168.22.26
ssh_node_users:
titan-24: tethys
ssh_managed_nodes:
- titan-db
- titan-0a
- titan-0b
- titan-0c
- titan-12
- titan-14
- titan-15
- titan-17
- titan-18
- titan-22
ssh_jump_host: ""
ssh_jump_user: ""
iac_repo_path: /opt/titan-iac
expected_flux_branch: main
control_planes:
@ -10,7 +51,7 @@ control_planes:
workers: []
local_bootstrap_paths:
- infrastructure/core
- infrastructure/flux-system
- clusters/atlas/flux-system
- infrastructure/sources/helm
- infrastructure/metallb
- infrastructure/traefik
@ -31,6 +72,9 @@ excluded_namespaces:
- vault
- postgres
- maintenance
startup:
api_wait_seconds: 1200
api_poll_seconds: 2
shutdown:
default_budget_seconds: 300
skip_etcd_snapshot: false
@ -56,6 +100,8 @@ coordination:
forward_shutdown_config: /etc/hecate/hecate.yaml
fallback_local_shutdown: true
command_timeout_seconds: 25
role: coordinator
allow_startup_on_battery: false
metrics:
enabled: true
bind_addr: 0.0.0.0:9560
@ -64,3 +110,4 @@ state:
dir: /var/lib/hecate
run_history_path: /var/lib/hecate/runs.json
lock_path: /var/lib/hecate/hecate.lock
intent_path: /var/lib/hecate/intent.json

View File

@ -73,20 +73,47 @@ func (o *Orchestrator) Startup(ctx context.Context, opts StartupOptions) (err er
}
defer o.finalizeRecord(&record, &err)
if !o.runner.DryRun {
currentIntent, readErr := state.ReadIntent(o.cfg.State.IntentPath)
if readErr != nil {
return fmt.Errorf("read startup intent: %w", readErr)
}
if currentIntent.State == state.IntentShuttingDown {
return fmt.Errorf("startup blocked: shutdown intent is active (%s)", currentIntent.Reason)
}
if writeErr := state.MustWriteIntent(o.cfg.State.IntentPath, state.IntentStartupInProgress, opts.Reason, "startup"); writeErr != nil {
return fmt.Errorf("set startup intent: %w", writeErr)
}
defer func() {
if err == nil {
if writeErr := state.MustWriteIntent(o.cfg.State.IntentPath, state.IntentNormal, opts.Reason, "startup"); writeErr != nil {
o.log.Printf("warning: write startup completion intent failed: %v", writeErr)
}
}
}()
}
o.log.Printf("startup control-planes=%s", strings.Join(o.cfg.ControlPlanes, ","))
o.reportFluxSource(ctx, opts.ForceFluxBranch)
o.startControlPlanes(ctx, o.cfg.ControlPlanes)
apiPoll := time.Duration(o.cfg.Startup.APIPollSeconds) * time.Second
apiAttempts := o.cfg.Startup.APIWaitSeconds / o.cfg.Startup.APIPollSeconds
if apiAttempts < 1 {
apiAttempts = 1
}
if err := o.waitForAPI(ctx, apiAttempts, apiPoll); err != nil {
return err
}
workers, err := o.effectiveWorkers(ctx)
if err != nil {
return err
}
o.log.Printf("startup control-planes=%s workers=%s", strings.Join(o.cfg.ControlPlanes, ","), strings.Join(workers, ","))
o.reportFluxSource(ctx, opts.ForceFluxBranch)
o.startControlPlanes(ctx, o.cfg.ControlPlanes)
o.log.Printf("startup workers=%s", strings.Join(workers, ","))
o.startWorkers(ctx, workers)
if err := o.waitForAPI(ctx, 120, 2*time.Second); err != nil {
return err
}
if opts.ForceFluxBranch != "" {
patch := fmt.Sprintf(`{"spec":{"ref":{"branch":"%s"}}}`, opts.ForceFluxBranch)
if _, err := o.kubectl(ctx, 20*time.Second, "-n", "flux-system", "patch", "gitrepository", "flux-system", "--type=merge", "-p", patch); err != nil {
@ -167,6 +194,20 @@ func (o *Orchestrator) Shutdown(ctx context.Context, opts ShutdownOptions) (err
StartedAt: time.Now().UTC(),
}
defer o.finalizeRecord(&record, &err)
if !o.runner.DryRun {
if writeErr := state.MustWriteIntent(o.cfg.State.IntentPath, state.IntentShuttingDown, opts.Reason, "shutdown"); writeErr != nil {
return fmt.Errorf("set shutdown intent: %w", writeErr)
}
defer func() {
final := state.IntentShuttingDown
if err == nil {
final = state.IntentShutdownComplete
}
if writeErr := state.MustWriteIntent(o.cfg.State.IntentPath, final, opts.Reason, "shutdown"); writeErr != nil {
o.log.Printf("warning: write shutdown completion intent failed: %v", writeErr)
}
}()
}
workers, err := o.effectiveWorkers(ctx)
if err != nil {
@ -322,6 +363,10 @@ func (o *Orchestrator) drainWorkers(ctx context.Context, workers []string) error
func (o *Orchestrator) stopWorkers(ctx context.Context, workers []string) {
for _, n := range workers {
if !o.sshManaged(n) {
o.log.Printf("skip stop k3s-agent on %s: node not in ssh_managed_nodes", n)
continue
}
o.bestEffort("stop k3s-agent on "+n, func() error {
_, err := o.ssh(ctx, n, "sudo systemctl stop k3s-agent || true")
return err
@ -331,6 +376,10 @@ func (o *Orchestrator) stopWorkers(ctx context.Context, workers []string) {
func (o *Orchestrator) startWorkers(ctx context.Context, workers []string) {
for _, n := range workers {
if !o.sshManaged(n) {
o.log.Printf("skip start k3s-agent on %s: node not in ssh_managed_nodes", n)
continue
}
o.bestEffort("start k3s-agent on "+n, func() error {
_, err := o.ssh(ctx, n, "sudo systemctl start k3s-agent || true")
return err
@ -340,6 +389,10 @@ func (o *Orchestrator) startWorkers(ctx context.Context, workers []string) {
func (o *Orchestrator) stopControlPlanes(ctx context.Context, cps []string) {
for _, n := range cps {
if !o.sshManaged(n) {
o.log.Printf("skip stop k3s on %s: node not in ssh_managed_nodes", n)
continue
}
o.bestEffort("stop k3s on "+n, func() error {
_, err := o.ssh(ctx, n, "sudo systemctl stop k3s || true")
return err
@ -349,6 +402,10 @@ func (o *Orchestrator) stopControlPlanes(ctx context.Context, cps []string) {
func (o *Orchestrator) startControlPlanes(ctx context.Context, cps []string) {
for _, n := range cps {
if !o.sshManaged(n) {
o.log.Printf("skip start k3s on %s: node not in ssh_managed_nodes", n)
continue
}
o.bestEffort("start k3s on "+n, func() error {
_, err := o.ssh(ctx, n, "sudo systemctl start k3s || true")
return err
@ -357,6 +414,9 @@ func (o *Orchestrator) startControlPlanes(ctx context.Context, cps []string) {
}
func (o *Orchestrator) takeEtcdSnapshot(ctx context.Context, node string) error {
if !o.sshManaged(node) {
return fmt.Errorf("cannot run etcd snapshot on %s: node not in ssh_managed_nodes", node)
}
name := "pre-shutdown-" + time.Now().UTC().Format("20060102-150405")
_, err := o.ssh(ctx, node, "sudo k3s etcd-snapshot save --name "+name)
return err
@ -404,11 +464,18 @@ func (o *Orchestrator) bootstrapLocal(ctx context.Context) error {
for _, rel := range o.cfg.LocalBootstrapPaths {
full := filepath.Join(o.cfg.IACRepoPath, rel)
o.log.Printf("local bootstrap apply -k %s", full)
if _, err := o.kubectl(ctx, 2*time.Minute, "apply", "-k", full); err != nil {
failures++
o.log.Printf("warning: local bootstrap apply failed at %s: %v", full, err)
if o.runner.DryRun {
continue
}
if _, err := o.kubectl(ctx, 2*time.Minute, "apply", "-k", full); err != nil {
o.log.Printf("warning: local bootstrap apply failed at %s: %v", full, err)
o.log.Printf("local bootstrap fallback render/apply with LoadRestrictionsNone for %s", full)
if fallbackErr := o.applyKustomizeFallback(ctx, full); fallbackErr != nil {
failures++
o.log.Printf("warning: local bootstrap fallback failed at %s: %v", full, fallbackErr)
continue
}
}
}
if failures == len(o.cfg.LocalBootstrapPaths) {
return fmt.Errorf("local bootstrap apply failed for every configured path (%d total)", failures)
@ -416,6 +483,14 @@ func (o *Orchestrator) bootstrapLocal(ctx context.Context) error {
return nil
}
func (o *Orchestrator) applyKustomizeFallback(ctx context.Context, full string) error {
cmd := fmt.Sprintf("kubectl kustomize --load-restrictor=LoadRestrictionsNone %q | kubectl apply -f -", full)
if _, err := o.runSensitive(ctx, 3*time.Minute, "sh", "-lc", cmd); err != nil {
return err
}
return nil
}
func (o *Orchestrator) waitForFluxSourceReady(ctx context.Context, window time.Duration) (bool, error) {
if o.runner.DryRun {
return true, nil
@ -485,11 +560,66 @@ func (o *Orchestrator) kubectl(ctx context.Context, timeout time.Duration, args
}
func (o *Orchestrator) ssh(ctx context.Context, node string, command string) (string, error) {
target := node
if o.cfg.SSHUser != "" {
target = o.cfg.SSHUser + "@" + node
host := node
if mapped, ok := o.cfg.SSHNodeHosts[node]; ok && strings.TrimSpace(mapped) != "" {
host = strings.TrimSpace(mapped)
}
return o.run(ctx, 45*time.Second, "ssh", "-o", "BatchMode=yes", "-o", "ConnectTimeout=8", target, command)
sshUser := o.cfg.SSHUser
if override, ok := o.cfg.SSHNodeUsers[node]; ok && strings.TrimSpace(override) != "" {
sshUser = strings.TrimSpace(override)
}
target := host
if sshUser != "" {
target = sshUser + "@" + host
}
baseArgs := []string{
"-o", "BatchMode=yes",
"-o", "ConnectTimeout=8",
"-o", "StrictHostKeyChecking=accept-new",
}
if o.cfg.SSHIdentityFile != "" {
baseArgs = append(baseArgs, "-i", o.cfg.SSHIdentityFile)
}
if o.cfg.SSHPort > 0 {
baseArgs = append(baseArgs, "-p", strconv.Itoa(o.cfg.SSHPort))
}
attempts := make([][]string, 0, 2)
attemptNames := make([]string, 0, 2)
if o.cfg.SSHJumpHost != "" {
jump := o.cfg.SSHJumpHost
if o.cfg.SSHJumpUser != "" {
jump = o.cfg.SSHJumpUser + "@" + jump
}
if o.cfg.SSHPort > 0 && !strings.Contains(jump, ":") {
jump = fmt.Sprintf("%s:%d", jump, o.cfg.SSHPort)
}
withJump := append([]string{}, baseArgs...)
withJump = append(withJump, "-J", jump, target, command)
attempts = append(attempts, withJump)
attemptNames = append(attemptNames, "jump")
}
direct := append([]string{}, baseArgs...)
direct = append(direct, target, command)
attempts = append(attempts, direct)
attemptNames = append(attemptNames, "direct")
var lastOut string
var lastErr error
for i, args := range attempts {
out, err := o.run(ctx, 45*time.Second, "ssh", args...)
if err == nil {
if i > 0 {
o.log.Printf("warning: ssh %s path failed for %s, using %s path", attemptNames[i-1], node, attemptNames[i])
}
return out, nil
}
lastOut = out
lastErr = err
if i < len(attempts)-1 {
o.log.Printf("warning: ssh %s path failed for %s: %v; trying %s path", attemptNames[i], node, err, attemptNames[i+1])
}
}
return lastOut, lastErr
}
func (o *Orchestrator) run(ctx context.Context, timeout time.Duration, name string, args ...string) (string, error) {
@ -534,6 +664,18 @@ func lines(in string) []string {
return out
}
func (o *Orchestrator) sshManaged(node string) bool {
if len(o.cfg.SSHManagedNodes) == 0 {
return true
}
for _, allowed := range o.cfg.SSHManagedNodes {
if strings.TrimSpace(allowed) == node {
return true
}
}
return false
}
func (o *Orchestrator) bestEffort(name string, fn func() error) {
if err := fn(); err != nil {
o.log.Printf("warning: %s: %v", name, err)

View File

@ -8,19 +8,32 @@ import (
)
type Config struct {
Kubeconfig string `yaml:"kubeconfig"`
SSHUser string `yaml:"ssh_user"`
IACRepoPath string `yaml:"iac_repo_path"`
ExpectedFluxBranch string `yaml:"expected_flux_branch"`
ControlPlanes []string `yaml:"control_planes"`
Workers []string `yaml:"workers"`
LocalBootstrapPaths []string `yaml:"local_bootstrap_paths"`
ExcludedNamespaces []string `yaml:"excluded_namespaces"`
Shutdown Shutdown `yaml:"shutdown"`
UPS UPS `yaml:"ups"`
Coordination Coordination `yaml:"coordination"`
Metrics Metrics `yaml:"metrics"`
State State `yaml:"state"`
Kubeconfig string `yaml:"kubeconfig"`
SSHUser string `yaml:"ssh_user"`
SSHPort int `yaml:"ssh_port"`
SSHIdentityFile string `yaml:"ssh_identity_file"`
SSHNodeHosts map[string]string `yaml:"ssh_node_hosts"`
SSHNodeUsers map[string]string `yaml:"ssh_node_users"`
SSHManagedNodes []string `yaml:"ssh_managed_nodes"`
SSHJumpHost string `yaml:"ssh_jump_host"`
SSHJumpUser string `yaml:"ssh_jump_user"`
IACRepoPath string `yaml:"iac_repo_path"`
ExpectedFluxBranch string `yaml:"expected_flux_branch"`
ControlPlanes []string `yaml:"control_planes"`
Workers []string `yaml:"workers"`
LocalBootstrapPaths []string `yaml:"local_bootstrap_paths"`
ExcludedNamespaces []string `yaml:"excluded_namespaces"`
Startup Startup `yaml:"startup"`
Shutdown Shutdown `yaml:"shutdown"`
UPS UPS `yaml:"ups"`
Coordination Coordination `yaml:"coordination"`
Metrics Metrics `yaml:"metrics"`
State State `yaml:"state"`
}
type Startup struct {
APIWaitSeconds int `yaml:"api_wait_seconds"`
APIPollSeconds int `yaml:"api_poll_seconds"`
}
type Shutdown struct {
@ -55,6 +68,8 @@ type Coordination struct {
ForwardShutdownConfig string `yaml:"forward_shutdown_config"`
FallbackLocalShutdown bool `yaml:"fallback_local_shutdown"`
CommandTimeoutSeconds int `yaml:"command_timeout_seconds"`
Role string `yaml:"role"`
AllowStartupOnBattery bool `yaml:"allow_startup_on_battery"`
}
type Metrics struct {
@ -67,6 +82,7 @@ type State struct {
Dir string `yaml:"dir"`
RunHistoryPath string `yaml:"run_history_path"`
LockPath string `yaml:"lock_path"`
IntentPath string `yaml:"intent_path"`
}
func Load(path string) (Config, error) {
@ -100,6 +116,15 @@ func (c Config) Validate() error {
if c.Shutdown.DefaultBudgetSeconds <= 0 {
return fmt.Errorf("config.shutdown.default_budget_seconds must be > 0")
}
if c.Startup.APIWaitSeconds <= 0 {
return fmt.Errorf("config.startup.api_wait_seconds must be > 0")
}
if c.Startup.APIPollSeconds <= 0 {
return fmt.Errorf("config.startup.api_poll_seconds must be > 0")
}
if c.SSHPort <= 0 || c.SSHPort > 65535 {
return fmt.Errorf("config.ssh_port must be in range 1-65535")
}
if c.UPS.Enabled {
if c.UPS.Provider == "" {
return fmt.Errorf("config.ups.provider must not be empty when ups is enabled")
@ -118,9 +143,15 @@ func (c Config) Validate() error {
return fmt.Errorf("config.coordination.forward_shutdown_config must not be empty when forward_shutdown_host is set")
}
}
if c.Coordination.Role != "coordinator" && c.Coordination.Role != "peer" {
return fmt.Errorf("config.coordination.role must be coordinator or peer")
}
if c.State.RunHistoryPath == "" || c.State.LockPath == "" {
return fmt.Errorf("config.state.run_history_path and config.state.lock_path must not be empty")
}
if c.State.IntentPath == "" {
return fmt.Errorf("config.state.intent_path must not be empty")
}
return nil
}
@ -128,10 +159,11 @@ func defaults() Config {
c := Config{
IACRepoPath: "/opt/titan-iac",
ExpectedFluxBranch: "main",
SSHPort: 2277,
ControlPlanes: []string{"titan-0a", "titan-0b", "titan-0c"},
LocalBootstrapPaths: []string{
"infrastructure/core",
"infrastructure/flux-system",
"clusters/atlas/flux-system",
"infrastructure/sources/helm",
"infrastructure/metallb",
"infrastructure/traefik",
@ -154,6 +186,10 @@ func defaults() Config {
"postgres",
"maintenance",
},
Startup: Startup{
APIWaitSeconds: 1200,
APIPollSeconds: 2,
},
Shutdown: Shutdown{
DefaultBudgetSeconds: 300,
PoweroffEnabled: true,
@ -172,6 +208,8 @@ func defaults() Config {
ForwardShutdownConfig: "/etc/hecate/hecate.yaml",
FallbackLocalShutdown: true,
CommandTimeoutSeconds: 25,
Role: "coordinator",
AllowStartupOnBattery: false,
},
Metrics: Metrics{
Enabled: true,
@ -182,6 +220,7 @@ func defaults() Config {
Dir: "/var/lib/hecate",
RunHistoryPath: "/var/lib/hecate/runs.json",
LockPath: "/var/lib/hecate/hecate.lock",
IntentPath: "/var/lib/hecate/intent.json",
},
}
c.applyDefaults()
@ -195,6 +234,15 @@ func (c *Config) applyDefaults() {
if c.IACRepoPath == "" {
c.IACRepoPath = "/opt/titan-iac"
}
if c.Startup.APIWaitSeconds <= 0 {
c.Startup.APIWaitSeconds = 1200
}
if c.Startup.APIPollSeconds <= 0 {
c.Startup.APIPollSeconds = 2
}
if c.SSHPort <= 0 {
c.SSHPort = 2277
}
if c.Shutdown.DefaultBudgetSeconds <= 0 {
c.Shutdown.DefaultBudgetSeconds = 300
}
@ -219,6 +267,9 @@ func (c *Config) applyDefaults() {
if c.Coordination.CommandTimeoutSeconds <= 0 {
c.Coordination.CommandTimeoutSeconds = 25
}
if c.Coordination.Role == "" {
c.Coordination.Role = "coordinator"
}
if c.Metrics.BindAddr == "" {
c.Metrics.BindAddr = "0.0.0.0:9560"
}
@ -234,4 +285,7 @@ func (c *Config) applyDefaults() {
if c.State.LockPath == "" {
c.State.LockPath = "/var/lib/hecate/hecate.lock"
}
if c.State.IntentPath == "" {
c.State.IntentPath = "/var/lib/hecate/intent.json"
}
}

View File

@ -47,3 +47,11 @@ func TestValidateForwardShutdownRequiresConfigPath(t *testing.T) {
t.Fatalf("expected validation error for missing forward_shutdown_config")
}
}
func TestValidateRejectsUnknownRole(t *testing.T) {
cfg := defaults()
cfg.Coordination.Role = "unknown"
if err := cfg.Validate(); err == nil {
t.Fatalf("expected validation error for unknown coordination role")
}
}

View File

@ -7,12 +7,14 @@ import (
"math"
"net/http"
"os/exec"
"strconv"
"strings"
"time"
"scm.bstein.dev/bstein/hecate/internal/cluster"
"scm.bstein.dev/bstein/hecate/internal/config"
"scm.bstein.dev/bstein/hecate/internal/metrics"
"scm.bstein.dev/bstein/hecate/internal/state"
"scm.bstein.dev/bstein/hecate/internal/ups"
)
@ -149,10 +151,22 @@ func (d *Daemon) Run(ctx context.Context) error {
}
func (d *Daemon) triggerShutdown(ctx context.Context, reason string) error {
intent, err := state.ReadIntent(d.cfg.State.IntentPath)
if err == nil && intent.State == state.IntentShuttingDown {
d.log.Printf("shutdown already in progress; skipping duplicate trigger: %s", reason)
return nil
}
if err := state.MustWriteIntent(d.cfg.State.IntentPath, state.IntentShuttingDown, reason, "daemon"); err != nil {
d.log.Printf("warning: unable to persist shutdown intent before trigger: %v", err)
}
d.log.Printf("triggering shutdown: %s", reason)
d.exporter.MarkShutdown(reason)
if d.cfg.Coordination.ForwardShutdownHost != "" {
if err := d.forwardShutdown(ctx, reason); err == nil {
if setErr := state.MustWriteIntent(d.cfg.State.IntentPath, state.IntentShutdownComplete, reason, "daemon-forwarded"); setErr != nil {
d.log.Printf("warning: unable to persist forwarded shutdown completion intent: %v", setErr)
}
d.log.Printf("shutdown trigger forwarded to %s", d.cfg.Coordination.ForwardShutdownHost)
return nil
} else if !d.cfg.Coordination.FallbackLocalShutdown {
@ -161,14 +175,16 @@ func (d *Daemon) triggerShutdown(ctx context.Context, reason string) error {
d.log.Printf("warning: forward shutdown failed; falling back to local shutdown: %v", err)
}
}
return d.orch.Shutdown(ctx, cluster.ShutdownOptions{Reason: reason})
if err := d.orch.Shutdown(ctx, cluster.ShutdownOptions{Reason: reason}); err != nil {
return err
}
if setErr := state.MustWriteIntent(d.cfg.State.IntentPath, state.IntentShutdownComplete, reason, "daemon-local"); setErr != nil {
d.log.Printf("warning: unable to persist local shutdown completion intent: %v", setErr)
}
return nil
}
func (d *Daemon) forwardShutdown(ctx context.Context, reason string) error {
userHost := d.cfg.Coordination.ForwardShutdownHost
if d.cfg.Coordination.ForwardShutdownUser != "" {
userHost = d.cfg.Coordination.ForwardShutdownUser + "@" + userHost
}
timeout := time.Duration(d.cfg.Coordination.CommandTimeoutSeconds) * time.Second
if timeout <= 0 {
timeout = 25 * time.Second
@ -181,7 +197,46 @@ func (d *Daemon) forwardShutdown(ctx context.Context, reason string) error {
d.cfg.Coordination.ForwardShutdownConfig,
reason,
)
cmd := exec.CommandContext(runCtx, "ssh", "-o", "BatchMode=yes", "-o", "ConnectTimeout=8", userHost, remoteCmd)
host := d.cfg.Coordination.ForwardShutdownHost
if mapped, ok := d.cfg.SSHNodeHosts[host]; ok && strings.TrimSpace(mapped) != "" {
host = strings.TrimSpace(mapped)
}
user := d.cfg.Coordination.ForwardShutdownUser
if user == "" {
if override, ok := d.cfg.SSHNodeUsers[d.cfg.Coordination.ForwardShutdownHost]; ok && strings.TrimSpace(override) != "" {
user = strings.TrimSpace(override)
} else {
user = d.cfg.SSHUser
}
}
target := host
if user != "" {
target = user + "@" + host
}
args := []string{
"-o", "BatchMode=yes",
"-o", "ConnectTimeout=8",
"-o", "StrictHostKeyChecking=accept-new",
}
if d.cfg.SSHIdentityFile != "" {
args = append(args, "-i", d.cfg.SSHIdentityFile)
}
if d.cfg.SSHPort > 0 {
args = append(args, "-p", strconv.Itoa(d.cfg.SSHPort))
}
if d.cfg.SSHJumpHost != "" {
jump := d.cfg.SSHJumpHost
if d.cfg.SSHJumpUser != "" {
jump = d.cfg.SSHJumpUser + "@" + jump
}
if d.cfg.SSHPort > 0 && !strings.Contains(jump, ":") {
jump = fmt.Sprintf("%s:%d", jump, d.cfg.SSHPort)
}
args = append(args, "-J", jump)
}
args = append(args, target, remoteCmd)
cmd := exec.CommandContext(runCtx, "ssh", args...)
out, err := cmd.CombinedOutput()
if err != nil {
trimmed := strings.TrimSpace(string(out))

69
internal/state/intent.go Normal file
View File

@ -0,0 +1,69 @@
package state
import (
"encoding/json"
"fmt"
"os"
"path/filepath"
"time"
)
const (
IntentNormal = "normal"
IntentStartupInProgress = "startup_in_progress"
IntentShuttingDown = "shutting_down"
IntentShutdownComplete = "shutdown_complete"
)
type Intent struct {
State string `json:"state"`
Reason string `json:"reason,omitempty"`
Source string `json:"source,omitempty"`
UpdatedAt time.Time `json:"updated_at"`
}
func ReadIntent(path string) (Intent, error) {
b, err := os.ReadFile(path)
if err != nil {
if os.IsNotExist(err) {
return Intent{}, nil
}
return Intent{}, err
}
if len(b) == 0 {
return Intent{}, nil
}
var in Intent
if err := json.Unmarshal(b, &in); err != nil {
return Intent{}, err
}
return in, nil
}
func WriteIntent(path string, in Intent) error {
if in.UpdatedAt.IsZero() {
in.UpdatedAt = time.Now().UTC()
}
if err := os.MkdirAll(filepath.Dir(path), 0o750); err != nil {
return err
}
b, err := json.MarshalIndent(in, "", " ")
if err != nil {
return err
}
return os.WriteFile(path, b, 0o640)
}
func MustWriteIntent(path string, state string, reason string, source string) error {
switch state {
case IntentNormal, IntentStartupInProgress, IntentShuttingDown, IntentShutdownComplete:
default:
return fmt.Errorf("invalid intent state: %s", state)
}
return WriteIntent(path, Intent{
State: state,
Reason: reason,
Source: source,
UpdatedAt: time.Now().UTC(),
})
}

View File

@ -0,0 +1,30 @@
package state
import (
"path/filepath"
"testing"
)
func TestWriteReadIntentRoundTrip(t *testing.T) {
p := filepath.Join(t.TempDir(), "intent.json")
if err := MustWriteIntent(p, IntentShuttingDown, "ups-threshold", "daemon"); err != nil {
t.Fatalf("write intent: %v", err)
}
in, err := ReadIntent(p)
if err != nil {
t.Fatalf("read intent: %v", err)
}
if in.State != IntentShuttingDown {
t.Fatalf("expected state %q, got %q", IntentShuttingDown, in.State)
}
if in.Source != "daemon" {
t.Fatalf("expected source daemon, got %q", in.Source)
}
}
func TestMustWriteIntentRejectsUnknownState(t *testing.T) {
p := filepath.Join(t.TempDir(), "intent.json")
if err := MustWriteIntent(p, "weird", "x", "y"); err == nil {
t.Fatalf("expected invalid state error")
}
}

View File

@ -19,6 +19,7 @@ Drills:
flux-gitea-deadlock Simulate flux-controller + gitea outage and require startup recovery.
foundation-recovery Simulate vault/postgres/gitea outage and require layered restore.
reconciliation-resume Simulate global Flux suspend + source-controller down and require resume.
startup-intent-guard Assert startup is blocked when shutdown intent is active.
Notes:
- Drills are intentionally disruptive and are not part of regular `make test`.
@ -73,7 +74,7 @@ wait_ready() {
run_hecate_startup() {
local reason="$1"
local cmd=(sudo "${HECATE_BIN}" startup --config "${HECATE_CONFIG}" --execute --force-flux-branch main)
local cmd=(sudo "${HECATE_BIN}" startup --config "${HECATE_CONFIG}" --execute --force-flux-branch main --reason "${reason}")
if [[ "${EXECUTE}" -eq 0 ]]; then
log "plan: ssh ${HECATE_COORDINATOR_HOST} '${cmd[*]}'"
return 0
@ -272,6 +273,41 @@ run_drill_reconciliation_resume() {
ROLLBACK_FLUX_SUSPEND=0
}
run_drill_startup_intent_guard() {
local intent_path="/var/lib/hecate/intent.json"
local backup_path="/tmp/hecate-intent-pre-drill.json"
local inject_cmd="
if [ -f '${intent_path}' ]; then sudo cp '${intent_path}' '${backup_path}'; else sudo rm -f '${backup_path}'; fi
cat <<'JSON' | sudo tee '${intent_path}' >/dev/null
{\"state\":\"shutting_down\",\"reason\":\"drill-intent-guard\",\"source\":\"drill\",\"updated_at\":\"$(date -u +%Y-%m-%dT%H:%M:%SZ)\"}
JSON
"
local restore_cmd="
if [ -f '${backup_path}' ]; then
sudo mv '${backup_path}' '${intent_path}'
else
sudo rm -f '${intent_path}'
fi
"
local startup_cmd="sudo ${HECATE_BIN} startup --config ${HECATE_CONFIG} --execute --force-flux-branch main --reason drill-startup-intent-guard"
if [[ "${EXECUTE}" -eq 0 ]]; then
log "plan: ssh ${HECATE_COORDINATOR_HOST} '<inject shutdown intent>'"
log "plan: ssh ${HECATE_COORDINATOR_HOST} '${startup_cmd}' (expect failure)"
log "plan: ssh ${HECATE_COORDINATOR_HOST} '<restore prior intent>'"
log "pass: startup-intent-guard (plan mode)"
return 0
fi
ssh "${HECATE_COORDINATOR_HOST}" "bash -lc ${inject_cmd@Q}"
if ssh "${HECATE_COORDINATOR_HOST}" "bash -lc ${startup_cmd@Q}"; then
ssh "${HECATE_COORDINATOR_HOST}" "bash -lc ${restore_cmd@Q}" || true
die "startup-intent-guard failed: startup unexpectedly succeeded while shutdown intent was active"
fi
ssh "${HECATE_COORDINATOR_HOST}" "bash -lc ${restore_cmd@Q}"
log "pass: startup-intent-guard"
}
main() {
need_cmd "${KUBECTL}"
need_cmd ssh
@ -315,6 +351,9 @@ main() {
reconciliation-resume)
run_drill_reconciliation_resume
;;
startup-intent-guard)
run_drill_startup_intent_guard
;;
*)
die "unknown drill: ${drill}"
;;