commit fbdb2c269bcfe91432ade7c2feff95777ded488a Author: Brad Stein Date: Fri Apr 3 01:43:16 2026 -0300 bootstrap: scaffold hecate startup/shutdown service diff --git a/.gitignore b/.gitignore new file mode 100644 index 0000000..7b803b4 --- /dev/null +++ b/.gitignore @@ -0,0 +1,4 @@ +/bin/ +/dist/ +*.log +*.tmp diff --git a/Makefile b/Makefile new file mode 100644 index 0000000..996496d --- /dev/null +++ b/Makefile @@ -0,0 +1,16 @@ +.PHONY: build test fmt tidy install + +build: + go build -o dist/hecate ./cmd/hecate + +test: + go test ./... + +fmt: + gofmt -w ./cmd ./internal + +tidy: + go mod tidy + +install: + sudo ./scripts/install.sh diff --git a/README.md b/README.md new file mode 100644 index 0000000..a4a6e87 --- /dev/null +++ b/README.md @@ -0,0 +1,61 @@ +# Hecate + +Hecate is the host-level bootstrap and power-protection service for Titan. + +It runs on `titan-db` and handles: +- Staged **startup** (including Flux/Gitea bootstrap deadlock fallback) +- Graceful **shutdown** +- UPS-driven automatic shutdown decisions based on discharge/runtime + +## Why host-level + +A service inside Kubernetes cannot start a cluster that is fully down. +Hecate runs outside the cluster under systemd, so it can always orchestrate bring-up. + +## Commands + +- `hecate startup --config /etc/hecate/hecate.yaml --execute --force-flux-branch main` +- `hecate shutdown --config /etc/hecate/hecate.yaml --execute` +- `hecate daemon --config /etc/hecate/hecate.yaml` +- `hecate status --config /etc/hecate/hecate.yaml` + +## Manual install on titan-db + +```bash +git clone git@gitea-admin:bstein/hecate.git +cd hecate +sudo ./scripts/install.sh +sudoedit /etc/hecate/hecate.yaml +sudo systemctl restart hecate.service +``` + +Bootstrap now (without reboot): + +```bash +sudo systemctl start hecate-bootstrap.service +``` + +## Preconditions on titan-db + +- `kubectl` installed and configured (`kubeconfig` path in config) +- SSH reachability to all cluster nodes +- Remote sudo rights to run: + - `systemctl start/stop k3s` + - `systemctl start/stop k3s-agent` +- UPS telemetry available via NUT (`upsc`) + +## Config + +See `configs/hecate.example.yaml`. + +UPS auto-shutdown trigger uses: +- runtime threshold = `runtime_safety_factor * estimated_shutdown_budget` +- default safety factor `1.10` +- debounce across multiple polls to avoid noise + +Estimated shutdown budget is derived from historical successful shutdown runs (`/var/lib/hecate/runs.json`) with default fallback from config. + +## Notes + +- Default behavior for `startup` and `shutdown` is dry-run unless `--execute` is set. +- `hecate-bootstrap.service` is enabled to run at host boot and perform staged startup automatically. diff --git a/cmd/hecate/main.go b/cmd/hecate/main.go new file mode 100644 index 0000000..b8694a5 --- /dev/null +++ b/cmd/hecate/main.go @@ -0,0 +1,191 @@ +package main + +import ( + "context" + "flag" + "fmt" + "log" + "os" + "os/signal" + "syscall" + "time" + + "scm.bstein.dev/bstein/hecate/internal/cluster" + "scm.bstein.dev/bstein/hecate/internal/config" + "scm.bstein.dev/bstein/hecate/internal/execx" + "scm.bstein.dev/bstein/hecate/internal/service" + "scm.bstein.dev/bstein/hecate/internal/state" + "scm.bstein.dev/bstein/hecate/internal/ups" +) + +func main() { + logger := log.New(os.Stdout, "[hecate] ", log.LstdFlags) + if len(os.Args) < 2 { + usage() + os.Exit(2) + } + + sub := os.Args[1] + switch sub { + case "startup": + if err := runStartup(logger, os.Args[2:]); err != nil { + logger.Printf("startup failed: %v", err) + os.Exit(1) + } + case "shutdown": + if err := runShutdown(logger, os.Args[2:]); err != nil { + logger.Printf("shutdown failed: %v", err) + os.Exit(1) + } + case "daemon": + if err := runDaemon(logger, os.Args[2:]); err != nil { + logger.Printf("daemon failed: %v", err) + os.Exit(1) + } + case "status": + if err := runStatus(logger, os.Args[2:]); err != nil { + logger.Printf("status failed: %v", err) + os.Exit(1) + } + case "help", "-h", "--help": + usage() + default: + logger.Printf("unknown command: %s", sub) + usage() + os.Exit(2) + } +} + +func runStartup(logger *log.Logger, args []string) error { + fs := flag.NewFlagSet("startup", flag.ExitOnError) + configPath := fs.String("config", "/etc/hecate/hecate.yaml", "Path to config file") + execute := fs.Bool("execute", false, "Actually execute changes (default dry-run)") + forceBranch := fs.String("force-flux-branch", "", "Patch Flux source branch before resume") + skipLocalBootstrap := fs.Bool("skip-local-bootstrap", false, "Skip local fallback bootstrap applies") + _ = fs.Parse(args) + + _, orch, err := buildOrchestrator(logger, *configPath, !*execute) + if err != nil { + return err + } + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + + return orch.Startup(ctx, cluster.StartupOptions{ + ForceFluxBranch: *forceBranch, + SkipLocalBootstrap: *skipLocalBootstrap, + Reason: "manual-startup", + }) +} + +func runShutdown(logger *log.Logger, args []string) error { + fs := flag.NewFlagSet("shutdown", flag.ExitOnError) + configPath := fs.String("config", "/etc/hecate/hecate.yaml", "Path to config file") + execute := fs.Bool("execute", false, "Actually execute changes (default dry-run)") + skipEtcd := fs.Bool("skip-etcd-snapshot", false, "Skip etcd snapshot") + skipDrain := fs.Bool("skip-drain", false, "Skip worker drain") + _ = fs.Parse(args) + + _, orch, err := buildOrchestrator(logger, *configPath, !*execute) + if err != nil { + return err + } + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + + return orch.Shutdown(ctx, cluster.ShutdownOptions{ + SkipEtcdSnapshot: *skipEtcd, + SkipDrain: *skipDrain, + Reason: "manual-shutdown", + }) +} + +func runDaemon(logger *log.Logger, args []string) error { + fs := flag.NewFlagSet("daemon", flag.ExitOnError) + configPath := fs.String("config", "/etc/hecate/hecate.yaml", "Path to config file") + dryRunActions := fs.Bool("dry-run-actions", false, "Log planned actions without executing") + _ = fs.Parse(args) + + cfg, orch, err := buildOrchestrator(logger, *configPath, *dryRunActions) + if err != nil { + return err + } + if !cfg.UPS.Enabled { + return fmt.Errorf("UPS monitoring is disabled in config") + } + var provider ups.Provider + switch cfg.UPS.Provider { + case "nut": + provider = ups.NewNUTProvider(cfg.UPS.Target) + default: + return fmt.Errorf("unsupported UPS provider: %s", cfg.UPS.Provider) + } + + d := service.NewDaemon(cfg, orch, provider, logger) + ctx, stop := signal.NotifyContext(context.Background(), os.Interrupt, syscall.SIGTERM) + defer stop() + return d.Run(ctx) +} + +func runStatus(logger *log.Logger, args []string) error { + fs := flag.NewFlagSet("status", flag.ExitOnError) + configPath := fs.String("config", "/etc/hecate/hecate.yaml", "Path to config file") + _ = fs.Parse(args) + + cfg, orch, err := buildOrchestrator(logger, *configPath, true) + if err != nil { + return err + } + recs, err := state.New(cfg.State.RunHistoryPath).Load() + if err != nil { + return err + } + last := "none" + if len(recs) > 0 { + r := recs[len(recs)-1] + last = fmt.Sprintf("%s success=%t duration=%ds at=%s", r.Action, r.Success, r.DurationSeconds, r.EndedAt.Format(time.RFC3339)) + } + logger.Printf("expected_flux_branch=%s", cfg.ExpectedFluxBranch) + logger.Printf("control_planes=%v", cfg.ControlPlanes) + logger.Printf("estimated_shutdown_budget_seconds=%d", orch.EstimatedShutdownSeconds()) + logger.Printf("last_run=%s", last) + return nil +} + +func buildOrchestrator(logger *log.Logger, cfgPath string, dryRun bool) (config.Config, *cluster.Orchestrator, error) { + cfg, err := config.Load(cfgPath) + if err != nil { + return config.Config{}, nil, err + } + if err := state.EnsureDir(cfg.State.Dir); err != nil { + return config.Config{}, nil, err + } + runner := &execx.Runner{ + DryRun: dryRun, + Kubeconfig: cfg.Kubeconfig, + Logger: logger, + } + store := state.New(cfg.State.RunHistoryPath) + orch := cluster.New(cfg, runner, store, logger) + return cfg, orch, nil +} + +func usage() { + fmt.Print(`hecate: staged startup/shutdown + UPS-triggered protection + +Usage: + hecate [flags] + +Commands: + startup Perform staged cluster startup + shutdown Perform graceful cluster shutdown + daemon Monitor UPS and auto-trigger shutdown + status Print current hecate status and estimates + +Examples: + hecate startup --config /etc/hecate/hecate.yaml --execute --force-flux-branch main + hecate shutdown --config /etc/hecate/hecate.yaml --execute + hecate daemon --config /etc/hecate/hecate.yaml + hecate status --config /etc/hecate/hecate.yaml +`) +} diff --git a/configs/hecate.example.yaml b/configs/hecate.example.yaml new file mode 100644 index 0000000..d27b369 --- /dev/null +++ b/configs/hecate.example.yaml @@ -0,0 +1,48 @@ +# /etc/hecate/hecate.yaml +kubeconfig: /home/atlas/.kube/config +ssh_user: atlas +iac_repo_path: /opt/titan-iac +expected_flux_branch: main +control_planes: + - titan-0a + - titan-0b + - titan-0c +workers: [] +local_bootstrap_paths: + - infrastructure/core + - infrastructure/sources/helm + - infrastructure/metallb + - infrastructure/traefik + - infrastructure/vault-csi + - infrastructure/vault-injector + - services/vault + - infrastructure/postgres + - services/gitea +excluded_namespaces: + - kube-system + - kube-public + - kube-node-lease + - flux-system + - traefik + - metallb-system + - cert-manager + - longhorn-system + - vault + - postgres + - maintenance +shutdown: + default_budget_seconds: 300 + skip_etcd_snapshot: false + skip_drain: false +ups: + enabled: true + provider: nut + target: atlasups@localhost + poll_seconds: 5 + runtime_safety_factor: 1.10 + debounce_count: 3 + telemetry_timeout_seconds: 90 +state: + dir: /var/lib/hecate + run_history_path: /var/lib/hecate/runs.json + lock_path: /var/lib/hecate/hecate.lock diff --git a/deploy/systemd/hecate-bootstrap.service b/deploy/systemd/hecate-bootstrap.service new file mode 100644 index 0000000..526cf9b --- /dev/null +++ b/deploy/systemd/hecate-bootstrap.service @@ -0,0 +1,15 @@ +[Unit] +Description=Hecate Staged Cluster Bootstrap +Wants=network-online.target +After=network-online.target +ConditionPathExists=/etc/hecate/hecate.yaml + +[Service] +Type=oneshot +User=root +Group=root +ExecStart=/usr/local/bin/hecate startup --config /etc/hecate/hecate.yaml --execute --force-flux-branch main +TimeoutStartSec=1800 + +[Install] +WantedBy=multi-user.target diff --git a/deploy/systemd/hecate.service b/deploy/systemd/hecate.service new file mode 100644 index 0000000..cc78d20 --- /dev/null +++ b/deploy/systemd/hecate.service @@ -0,0 +1,17 @@ +[Unit] +Description=Hecate UPS Monitor and Auto Shutdown Orchestrator +Wants=network-online.target +After=network-online.target +ConditionPathExists=/etc/hecate/hecate.yaml + +[Service] +Type=simple +User=root +Group=root +ExecStart=/usr/local/bin/hecate daemon --config /etc/hecate/hecate.yaml +Restart=always +RestartSec=5 +NoNewPrivileges=true + +[Install] +WantedBy=multi-user.target diff --git a/go.mod b/go.mod new file mode 100644 index 0000000..ceb106a --- /dev/null +++ b/go.mod @@ -0,0 +1,5 @@ +module scm.bstein.dev/bstein/hecate + +go 1.25 + +require gopkg.in/yaml.v3 v3.0.1 diff --git a/go.sum b/go.sum new file mode 100644 index 0000000..a62c313 --- /dev/null +++ b/go.sum @@ -0,0 +1,4 @@ +gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405 h1:yhCVgyC4o1eVCa2tZl7eS0r+SDo693bJlVdllGtEeKM= +gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0= +gopkg.in/yaml.v3 v3.0.1 h1:fxVm/GzAzEWqLHuvctI91KS9hhNmmWOoWu0XTYJS7CA= +gopkg.in/yaml.v3 v3.0.1/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM= diff --git a/internal/cluster/orchestrator.go b/internal/cluster/orchestrator.go new file mode 100644 index 0000000..5dcfa20 --- /dev/null +++ b/internal/cluster/orchestrator.go @@ -0,0 +1,415 @@ +package cluster + +import ( + "context" + "fmt" + "log" + "path/filepath" + "strings" + "time" + + "scm.bstein.dev/bstein/hecate/internal/config" + "scm.bstein.dev/bstein/hecate/internal/execx" + "scm.bstein.dev/bstein/hecate/internal/state" +) + +type Orchestrator struct { + cfg config.Config + runner *execx.Runner + store *state.Store + log *log.Logger +} + +type StartupOptions struct { + ForceFluxBranch string + SkipLocalBootstrap bool + Reason string +} + +type ShutdownOptions struct { + SkipEtcdSnapshot bool + SkipDrain bool + Reason string +} + +func New(cfg config.Config, runner *execx.Runner, store *state.Store, logger *log.Logger) *Orchestrator { + return &Orchestrator{cfg: cfg, runner: runner, store: store, log: logger} +} + +func (o *Orchestrator) Startup(ctx context.Context, opts StartupOptions) (err error) { + unlock, err := state.AcquireLock(o.cfg.State.LockPath) + if err != nil { + return err + } + defer unlock() + + record := state.RunRecord{ + ID: fmt.Sprintf("startup-%d", time.Now().UnixNano()), + Action: "startup", + Reason: opts.Reason, + StartedAt: time.Now().UTC(), + } + defer o.finalizeRecord(&record, &err) + + workers, err := o.effectiveWorkers(ctx) + if err != nil { + return err + } + o.log.Printf("startup control-planes=%s workers=%s", strings.Join(o.cfg.ControlPlanes, ","), strings.Join(workers, ",")) + + o.reportFluxSource(ctx, opts.ForceFluxBranch) + o.startControlPlanes(ctx, o.cfg.ControlPlanes) + o.startWorkers(ctx, workers) + + if err := o.waitForAPI(ctx, 120, 2*time.Second); err != nil { + return err + } + + if opts.ForceFluxBranch != "" { + patch := fmt.Sprintf(`{"spec":{"ref":{"branch":"%s"}}}`, opts.ForceFluxBranch) + if _, err := o.kubectl(ctx, 20*time.Second, "-n", "flux-system", "patch", "gitrepository", "flux-system", "--type=merge", "-p", patch); err != nil { + return fmt.Errorf("force flux branch: %w", err) + } + } + + if !opts.SkipLocalBootstrap { + ready, readyErr := o.fluxSourceReady(ctx) + if readyErr != nil { + o.log.Printf("warning: unable to read flux source readiness: %v", readyErr) + } + if !ready { + o.log.Printf("flux source not ready, applying local bootstrap path") + if err := o.bootstrapLocal(ctx); err != nil { + return err + } + } + } + + if err := o.resumeFluxAndReconcile(ctx); err != nil { + return err + } + o.log.Printf("startup flow complete") + return nil +} + +func (o *Orchestrator) Shutdown(ctx context.Context, opts ShutdownOptions) (err error) { + unlock, err := state.AcquireLock(o.cfg.State.LockPath) + if err != nil { + return err + } + defer unlock() + + record := state.RunRecord{ + ID: fmt.Sprintf("shutdown-%d", time.Now().UnixNano()), + Action: "shutdown", + Reason: opts.Reason, + StartedAt: time.Now().UTC(), + } + defer o.finalizeRecord(&record, &err) + + workers, err := o.effectiveWorkers(ctx) + if err != nil { + return err + } + o.log.Printf("shutdown control-planes=%s workers=%s", strings.Join(o.cfg.ControlPlanes, ","), strings.Join(workers, ",")) + + o.reportFluxSource(ctx, "") + + skipEtcd := opts.SkipEtcdSnapshot || o.cfg.Shutdown.SkipEtcdSnapshot + if !skipEtcd { + o.bestEffort("etcd snapshot", func() error { + return o.takeEtcdSnapshot(ctx, o.cfg.ControlPlanes[0]) + }) + } + + o.bestEffort("suspend flux", func() error { return o.patchFluxSuspendAll(ctx, true) }) + o.bestEffort("scale down apps", func() error { return o.scaleDownApps(ctx) }) + + skipDrain := opts.SkipDrain || o.cfg.Shutdown.SkipDrain + if !skipDrain { + o.bestEffort("drain workers", func() error { return o.drainWorkers(ctx, workers) }) + } + + o.stopWorkers(ctx, workers) + o.stopControlPlanes(ctx, o.cfg.ControlPlanes) + o.log.Printf("shutdown flow complete") + return nil +} + +func (o *Orchestrator) EstimatedShutdownSeconds() int { + return o.store.ShutdownP95(o.cfg.Shutdown.DefaultBudgetSeconds) +} + +func (o *Orchestrator) finalizeRecord(record *state.RunRecord, err *error) { + record.EndedAt = time.Now().UTC() + record.DurationSeconds = int(record.EndedAt.Sub(record.StartedAt).Seconds()) + record.Success = *err == nil + if *err != nil { + record.Error = (*err).Error() + } + if appendErr := o.store.Append(*record); appendErr != nil { + o.log.Printf("warning: append run record failed: %v", appendErr) + } +} + +func (o *Orchestrator) effectiveWorkers(ctx context.Context) ([]string, error) { + if len(o.cfg.Workers) > 0 { + return append([]string{}, o.cfg.Workers...), nil + } + return o.discoverWorkers(ctx) +} + +func (o *Orchestrator) discoverWorkers(ctx context.Context) ([]string, error) { + out, err := o.kubectl(ctx, 15*time.Second, + "get", "nodes", + "-o", "custom-columns=NAME:.metadata.name,CP:.metadata.labels.node-role\\.kubernetes\\.io/control-plane,MASTER:.metadata.labels.node-role\\.kubernetes\\.io/master", + "--no-headers", + ) + if err != nil { + return nil, fmt.Errorf("discover workers: %w", err) + } + var workers []string + for _, line := range lines(out) { + fields := strings.Fields(line) + if len(fields) < 3 { + continue + } + if fields[1] == "" && fields[2] == "" { + workers = append(workers, fields[0]) + } + } + if len(workers) == 0 { + return nil, fmt.Errorf("no workers discovered") + } + return workers, nil +} + +func (o *Orchestrator) patchFluxSuspendAll(ctx context.Context, suspend bool) error { + patch := fmt.Sprintf(`{"spec":{"suspend":%t}}`, suspend) + + ksOut, err := o.kubectl(ctx, 20*time.Second, + "-n", "flux-system", "get", "kustomizations.kustomize.toolkit.fluxcd.io", + "-o", "jsonpath={range .items[*]}{.metadata.name}{'\\n'}{end}", + ) + if err != nil { + return err + } + for _, ks := range lines(ksOut) { + _, patchErr := o.kubectl(ctx, 20*time.Second, "-n", "flux-system", "patch", "kustomization", ks, "--type=merge", "-p", patch) + if patchErr != nil { + o.log.Printf("warning: patch kustomization %s failed: %v", ks, patchErr) + } + } + + hrOut, err := o.kubectl(ctx, 25*time.Second, + "get", "helmreleases.helm.toolkit.fluxcd.io", "-A", + "-o", "jsonpath={range .items[*]}{.metadata.namespace}{'/'}{.metadata.name}{'\\n'}{end}", + ) + if err != nil { + return err + } + for _, hr := range lines(hrOut) { + parts := strings.SplitN(hr, "/", 2) + if len(parts) != 2 { + continue + } + _, patchErr := o.kubectl(ctx, 20*time.Second, "-n", parts[0], "patch", "helmrelease", parts[1], "--type=merge", "-p", patch) + if patchErr != nil { + o.log.Printf("warning: patch helmrelease %s failed: %v", hr, patchErr) + } + } + return nil +} + +func (o *Orchestrator) scaleDownApps(ctx context.Context) error { + nsOut, err := o.kubectl(ctx, 15*time.Second, "get", "ns", "-o", "jsonpath={range .items[*]}{.metadata.name}{'\\n'}{end}") + if err != nil { + return err + } + exclude := map[string]struct{}{} + for _, ns := range o.cfg.ExcludedNamespaces { + exclude[ns] = struct{}{} + } + for _, ns := range lines(nsOut) { + if _, ok := exclude[ns]; ok { + continue + } + if _, scaleErr := o.kubectl(ctx, 15*time.Second, "-n", ns, "scale", "deployment", "--all", "--replicas=0"); scaleErr != nil { + o.log.Printf("warning: scale deployments in %s failed: %v", ns, scaleErr) + } + if _, scaleErr := o.kubectl(ctx, 15*time.Second, "-n", ns, "scale", "statefulset", "--all", "--replicas=0"); scaleErr != nil { + o.log.Printf("warning: scale statefulsets in %s failed: %v", ns, scaleErr) + } + } + return nil +} + +func (o *Orchestrator) drainWorkers(ctx context.Context, workers []string) error { + for _, node := range workers { + if _, err := o.kubectl(ctx, 20*time.Second, "cordon", node); err != nil { + o.log.Printf("warning: cordon %s failed: %v", node, err) + } + if _, err := o.kubectl(ctx, 3*time.Minute, "drain", node, "--ignore-daemonsets", "--delete-emptydir-data", "--grace-period=30", "--timeout=180s"); err != nil { + o.log.Printf("warning: drain %s failed: %v", node, err) + } + } + return nil +} + +func (o *Orchestrator) stopWorkers(ctx context.Context, workers []string) { + for _, n := range workers { + o.bestEffort("stop k3s-agent on "+n, func() error { + _, err := o.ssh(ctx, n, "sudo systemctl stop k3s-agent || true") + return err + }) + } +} + +func (o *Orchestrator) startWorkers(ctx context.Context, workers []string) { + for _, n := range workers { + o.bestEffort("start k3s-agent on "+n, func() error { + _, err := o.ssh(ctx, n, "sudo systemctl start k3s-agent || true") + return err + }) + } +} + +func (o *Orchestrator) stopControlPlanes(ctx context.Context, cps []string) { + for _, n := range cps { + o.bestEffort("stop k3s on "+n, func() error { + _, err := o.ssh(ctx, n, "sudo systemctl stop k3s || true") + return err + }) + } +} + +func (o *Orchestrator) startControlPlanes(ctx context.Context, cps []string) { + for _, n := range cps { + o.bestEffort("start k3s on "+n, func() error { + _, err := o.ssh(ctx, n, "sudo systemctl start k3s || true") + return err + }) + } +} + +func (o *Orchestrator) takeEtcdSnapshot(ctx context.Context, node string) error { + name := "pre-shutdown-" + time.Now().UTC().Format("20060102-150405") + _, err := o.ssh(ctx, node, "sudo k3s etcd-snapshot save --name "+name) + return err +} + +func (o *Orchestrator) waitForAPI(ctx context.Context, attempts int, sleep time.Duration) error { + if o.runner.DryRun { + return nil + } + for i := 0; i < attempts; i++ { + _, err := o.kubectl(ctx, 5*time.Second, "version", "--request-timeout=5s") + if err == nil { + return nil + } + time.Sleep(sleep) + } + return fmt.Errorf("kubernetes API did not become reachable within timeout") +} + +func (o *Orchestrator) fluxSourceReady(ctx context.Context) (bool, error) { + out, err := o.kubectl(ctx, 10*time.Second, "-n", "flux-system", "get", "gitrepository", "flux-system", "-o", "jsonpath={.status.conditions[?(@.type==\"Ready\")].status}") + if err != nil { + return false, err + } + return strings.Contains(out, "True"), nil +} + +func (o *Orchestrator) reportFluxSource(ctx context.Context, forceBranch string) { + urlOut, urlErr := o.kubectl(ctx, 10*time.Second, "-n", "flux-system", "get", "gitrepository", "flux-system", "-o", "jsonpath={.spec.url}") + if urlErr == nil { + o.log.Printf("flux-source-url=%s", strings.TrimSpace(urlOut)) + } + branchOut, branchErr := o.kubectl(ctx, 10*time.Second, "-n", "flux-system", "get", "gitrepository", "flux-system", "-o", "jsonpath={.spec.ref.branch}") + if branchErr == nil { + branch := strings.TrimSpace(branchOut) + o.log.Printf("flux-source-branch=%s", branch) + if forceBranch == "" && branch != o.cfg.ExpectedFluxBranch { + o.log.Printf("warning: flux source branch is '%s', expected '%s'", branch, o.cfg.ExpectedFluxBranch) + } + } +} + +func (o *Orchestrator) bootstrapLocal(ctx context.Context) error { + for _, rel := range o.cfg.LocalBootstrapPaths { + full := filepath.Join(o.cfg.IACRepoPath, rel) + o.log.Printf("local bootstrap apply -k %s", full) + if _, err := o.kubectl(ctx, 2*time.Minute, "apply", "-k", full); err != nil { + return fmt.Errorf("local bootstrap apply failed at %s: %w", full, err) + } + } + return nil +} + +func (o *Orchestrator) resumeFluxAndReconcile(ctx context.Context) error { + if err := o.patchFluxSuspendAll(ctx, false); err != nil { + return err + } + + if o.runner.CommandExists("flux") { + commands := [][]string{ + {"reconcile", "source", "git", "flux-system", "-n", "flux-system", "--timeout=3m"}, + {"reconcile", "kustomization", "core", "-n", "flux-system", "--with-source", "--timeout=5m"}, + {"reconcile", "kustomization", "helm", "-n", "flux-system", "--with-source", "--timeout=5m"}, + {"reconcile", "kustomization", "traefik", "-n", "flux-system", "--with-source", "--timeout=5m"}, + {"reconcile", "kustomization", "vault", "-n", "flux-system", "--with-source", "--timeout=10m"}, + {"reconcile", "kustomization", "postgres", "-n", "flux-system", "--with-source", "--timeout=10m"}, + {"reconcile", "kustomization", "gitea", "-n", "flux-system", "--with-source", "--timeout=10m"}, + } + for _, c := range commands { + if _, err := o.run(ctx, 3*time.Minute, "flux", c...); err != nil { + o.log.Printf("warning: flux command failed (%s): %v", strings.Join(c, " "), err) + } + } + return nil + } + + now := time.Now().UTC().Format(time.RFC3339) + _, err := o.kubectl(ctx, 20*time.Second, "-n", "flux-system", "annotate", "kustomizations.kustomize.toolkit.fluxcd.io", "--all", "reconcile.fluxcd.io/requestedAt="+now, "--overwrite") + return err +} + +func (o *Orchestrator) kubectl(ctx context.Context, timeout time.Duration, args ...string) (string, error) { + return o.run(ctx, timeout, "kubectl", args...) +} + +func (o *Orchestrator) ssh(ctx context.Context, node string, command string) (string, error) { + target := node + if o.cfg.SSHUser != "" { + target = o.cfg.SSHUser + "@" + node + } + return o.run(ctx, 45*time.Second, "ssh", "-o", "BatchMode=yes", "-o", "ConnectTimeout=8", target, command) +} + +func (o *Orchestrator) run(ctx context.Context, timeout time.Duration, name string, args ...string) (string, error) { + runCtx, cancel := context.WithTimeout(ctx, timeout) + defer cancel() + return o.runner.Run(runCtx, name, args...) +} + +func lines(in string) []string { + in = strings.TrimSpace(in) + if in == "" { + return nil + } + parts := strings.Split(in, "\n") + out := make([]string, 0, len(parts)) + for _, p := range parts { + v := strings.TrimSpace(p) + if v != "" { + out = append(out, v) + } + } + return out +} + +func (o *Orchestrator) bestEffort(name string, fn func() error) { + if err := fn(); err != nil { + o.log.Printf("warning: %s: %v", name, err) + } +} diff --git a/internal/config/config.go b/internal/config/config.go new file mode 100644 index 0000000..3e32a42 --- /dev/null +++ b/internal/config/config.go @@ -0,0 +1,172 @@ +package config + +import ( + "fmt" + "os" + + "gopkg.in/yaml.v3" +) + +type Config struct { + Kubeconfig string `yaml:"kubeconfig"` + SSHUser string `yaml:"ssh_user"` + IACRepoPath string `yaml:"iac_repo_path"` + ExpectedFluxBranch string `yaml:"expected_flux_branch"` + ControlPlanes []string `yaml:"control_planes"` + Workers []string `yaml:"workers"` + LocalBootstrapPaths []string `yaml:"local_bootstrap_paths"` + ExcludedNamespaces []string `yaml:"excluded_namespaces"` + Shutdown Shutdown `yaml:"shutdown"` + UPS UPS `yaml:"ups"` + State State `yaml:"state"` +} + +type Shutdown struct { + DefaultBudgetSeconds int `yaml:"default_budget_seconds"` + SkipEtcdSnapshot bool `yaml:"skip_etcd_snapshot"` + SkipDrain bool `yaml:"skip_drain"` +} + +type UPS struct { + Enabled bool `yaml:"enabled"` + Provider string `yaml:"provider"` + Target string `yaml:"target"` + PollSeconds int `yaml:"poll_seconds"` + RuntimeSafetyFactor float64 `yaml:"runtime_safety_factor"` + DebounceCount int `yaml:"debounce_count"` + TelemetryTimeoutSeconds int `yaml:"telemetry_timeout_seconds"` +} + +type State struct { + Dir string `yaml:"dir"` + RunHistoryPath string `yaml:"run_history_path"` + LockPath string `yaml:"lock_path"` +} + +func Load(path string) (Config, error) { + cfg := defaults() + + b, err := os.ReadFile(path) + if err != nil { + return Config{}, fmt.Errorf("read config %s: %w", path, err) + } + if err := yaml.Unmarshal(b, &cfg); err != nil { + return Config{}, fmt.Errorf("decode config %s: %w", path, err) + } + + cfg.applyDefaults() + if err := cfg.Validate(); err != nil { + return Config{}, err + } + return cfg, nil +} + +func (c Config) Validate() error { + if len(c.ControlPlanes) == 0 { + return fmt.Errorf("config.control_planes must not be empty") + } + if c.ExpectedFluxBranch == "" { + return fmt.Errorf("config.expected_flux_branch must not be empty") + } + if c.IACRepoPath == "" { + return fmt.Errorf("config.iac_repo_path must not be empty") + } + if c.Shutdown.DefaultBudgetSeconds <= 0 { + return fmt.Errorf("config.shutdown.default_budget_seconds must be > 0") + } + if c.UPS.Enabled { + if c.UPS.Provider == "" { + return fmt.Errorf("config.ups.provider must not be empty when ups is enabled") + } + if c.UPS.Target == "" { + return fmt.Errorf("config.ups.target must not be empty when ups is enabled") + } + } + if c.State.RunHistoryPath == "" || c.State.LockPath == "" { + return fmt.Errorf("config.state.run_history_path and config.state.lock_path must not be empty") + } + return nil +} + +func defaults() Config { + c := Config{ + IACRepoPath: "/opt/titan-iac", + ExpectedFluxBranch: "main", + ControlPlanes: []string{"titan-0a", "titan-0b", "titan-0c"}, + LocalBootstrapPaths: []string{ + "infrastructure/core", + "infrastructure/sources/helm", + "infrastructure/metallb", + "infrastructure/traefik", + "infrastructure/vault-csi", + "infrastructure/vault-injector", + "services/vault", + "infrastructure/postgres", + "services/gitea", + }, + ExcludedNamespaces: []string{ + "kube-system", + "kube-public", + "kube-node-lease", + "flux-system", + "traefik", + "metallb-system", + "cert-manager", + "longhorn-system", + "vault", + "postgres", + "maintenance", + }, + Shutdown: Shutdown{ + DefaultBudgetSeconds: 300, + }, + UPS: UPS{ + Enabled: true, + Provider: "nut", + PollSeconds: 5, + RuntimeSafetyFactor: 1.10, + DebounceCount: 3, + TelemetryTimeoutSeconds: 90, + }, + State: State{ + Dir: "/var/lib/hecate", + RunHistoryPath: "/var/lib/hecate/runs.json", + LockPath: "/var/lib/hecate/hecate.lock", + }, + } + c.applyDefaults() + return c +} + +func (c *Config) applyDefaults() { + if c.ExpectedFluxBranch == "" { + c.ExpectedFluxBranch = "main" + } + if c.IACRepoPath == "" { + c.IACRepoPath = "/opt/titan-iac" + } + if c.Shutdown.DefaultBudgetSeconds <= 0 { + c.Shutdown.DefaultBudgetSeconds = 300 + } + if c.UPS.PollSeconds <= 0 { + c.UPS.PollSeconds = 5 + } + if c.UPS.RuntimeSafetyFactor <= 0 { + c.UPS.RuntimeSafetyFactor = 1.10 + } + if c.UPS.DebounceCount <= 0 { + c.UPS.DebounceCount = 3 + } + if c.UPS.TelemetryTimeoutSeconds <= 0 { + c.UPS.TelemetryTimeoutSeconds = 90 + } + if c.State.Dir == "" { + c.State.Dir = "/var/lib/hecate" + } + if c.State.RunHistoryPath == "" { + c.State.RunHistoryPath = "/var/lib/hecate/runs.json" + } + if c.State.LockPath == "" { + c.State.LockPath = "/var/lib/hecate/hecate.lock" + } +} diff --git a/internal/execx/runner.go b/internal/execx/runner.go new file mode 100644 index 0000000..c1b3231 --- /dev/null +++ b/internal/execx/runner.go @@ -0,0 +1,49 @@ +package execx + +import ( + "context" + "fmt" + "log" + "os" + "os/exec" + "strings" +) + +type Runner struct { + DryRun bool + Kubeconfig string + Logger *log.Logger +} + +func (r *Runner) Run(ctx context.Context, name string, args ...string) (string, error) { + if r.DryRun { + r.logf("DRY-RUN: %s %s", name, strings.Join(args, " ")) + return "", nil + } + + cmd := exec.CommandContext(ctx, name, args...) + cmd.Env = os.Environ() + if r.Kubeconfig != "" { + cmd.Env = append(cmd.Env, "KUBECONFIG="+r.Kubeconfig) + } + out, err := cmd.CombinedOutput() + trimmed := strings.TrimSpace(string(out)) + if err != nil { + if trimmed == "" { + return "", fmt.Errorf("%s %s: %w", name, strings.Join(args, " "), err) + } + return trimmed, fmt.Errorf("%s %s: %w", name, strings.Join(args, " "), err) + } + return trimmed, nil +} + +func (r *Runner) CommandExists(name string) bool { + _, err := exec.LookPath(name) + return err == nil +} + +func (r *Runner) logf(format string, args ...any) { + if r.Logger != nil { + r.Logger.Printf(format, args...) + } +} diff --git a/internal/service/daemon.go b/internal/service/daemon.go new file mode 100644 index 0000000..91d8018 --- /dev/null +++ b/internal/service/daemon.go @@ -0,0 +1,94 @@ +package service + +import ( + "context" + "fmt" + "log" + "math" + "time" + + "scm.bstein.dev/bstein/hecate/internal/cluster" + "scm.bstein.dev/bstein/hecate/internal/config" + "scm.bstein.dev/bstein/hecate/internal/ups" +) + +type Daemon struct { + cfg config.Config + orch *cluster.Orchestrator + ups ups.Provider + log *log.Logger +} + +func NewDaemon(cfg config.Config, orch *cluster.Orchestrator, provider ups.Provider, logger *log.Logger) *Daemon { + return &Daemon{cfg: cfg, orch: orch, ups: provider, log: logger} +} + +func (d *Daemon) Run(ctx context.Context) error { + if !d.cfg.UPS.Enabled { + return fmt.Errorf("ups monitoring is disabled in config") + } + + poll := time.Duration(d.cfg.UPS.PollSeconds) * time.Second + if poll <= 0 { + poll = 5 * time.Second + } + telemetryTimeout := time.Duration(d.cfg.UPS.TelemetryTimeoutSeconds) * time.Second + if telemetryTimeout <= 0 { + telemetryTimeout = 90 * time.Second + } + debounce := d.cfg.UPS.DebounceCount + if debounce <= 0 { + debounce = 3 + } + + lastGood := time.Now() + lastOnBattery := false + breachCount := 0 + + t := time.NewTicker(poll) + defer t.Stop() + + d.log.Printf("hecate daemon started: poll=%s debounce=%d telemetry_timeout=%s", poll, debounce, telemetryTimeout) + + for { + select { + case <-ctx.Done(): + return ctx.Err() + case <-t.C: + sample, err := d.ups.Read(ctx) + if err != nil { + d.log.Printf("warning: ups read failed: %v", err) + if lastOnBattery && time.Since(lastGood) > telemetryTimeout { + d.log.Printf("ups telemetry timeout while on battery, triggering shutdown") + return d.triggerShutdown(ctx, "ups-telemetry-timeout") + } + continue + } + + lastGood = time.Now() + lastOnBattery = sample.OnBattery + + budget := d.orch.EstimatedShutdownSeconds() + threshold := int(math.Ceil(float64(budget) * d.cfg.UPS.RuntimeSafetyFactor)) + trigger := sample.LowBattery || (sample.OnBattery && sample.RuntimeSeconds > 0 && sample.RuntimeSeconds <= threshold) + + d.log.Printf("ups status=%s on_battery=%t runtime_s=%d threshold_s=%d budget_s=%d trigger=%t", + sample.RawStatus, sample.OnBattery, sample.RuntimeSeconds, threshold, budget, trigger) + + if trigger { + breachCount++ + if breachCount >= debounce { + reason := fmt.Sprintf("ups-threshold runtime=%ds threshold=%ds status=%s", sample.RuntimeSeconds, threshold, sample.RawStatus) + return d.triggerShutdown(ctx, reason) + } + } else { + breachCount = 0 + } + } + } +} + +func (d *Daemon) triggerShutdown(ctx context.Context, reason string) error { + d.log.Printf("triggering shutdown: %s", reason) + return d.orch.Shutdown(ctx, cluster.ShutdownOptions{Reason: reason}) +} diff --git a/internal/service/daemon_test.go b/internal/service/daemon_test.go new file mode 100644 index 0000000..6dabff9 --- /dev/null +++ b/internal/service/daemon_test.go @@ -0,0 +1,7 @@ +package service + +import "testing" + +func TestPlaceholder(t *testing.T) { + // Placeholder test keeps package-level test coverage active. +} diff --git a/internal/state/store.go b/internal/state/store.go new file mode 100644 index 0000000..ce1c5f6 --- /dev/null +++ b/internal/state/store.go @@ -0,0 +1,128 @@ +package state + +import ( + "encoding/json" + "fmt" + "math" + "os" + "path/filepath" + "sort" + "sync" + "time" +) + +type RunRecord struct { + ID string `json:"id"` + Action string `json:"action"` + Reason string `json:"reason,omitempty"` + StartedAt time.Time `json:"started_at"` + EndedAt time.Time `json:"ended_at"` + DurationSeconds int `json:"duration_seconds"` + Success bool `json:"success"` + Error string `json:"error,omitempty"` +} + +type Store struct { + path string + mu sync.Mutex +} + +func New(path string) *Store { + return &Store{path: path} +} + +func EnsureDir(dir string) error { + if dir == "" { + return fmt.Errorf("state dir must not be empty") + } + return os.MkdirAll(dir, 0o750) +} + +func AcquireLock(path string) (func(), error) { + if err := os.MkdirAll(filepath.Dir(path), 0o750); err != nil { + return nil, err + } + f, err := os.OpenFile(path, os.O_CREATE|os.O_EXCL|os.O_WRONLY, 0o600) + if err != nil { + return nil, fmt.Errorf("acquire lock %s: %w", path, err) + } + _, _ = f.WriteString(fmt.Sprintf("pid=%d started=%s\n", os.Getpid(), time.Now().Format(time.RFC3339))) + _ = f.Close() + return func() { + _ = os.Remove(path) + }, nil +} + +func (s *Store) Append(record RunRecord) error { + s.mu.Lock() + defer s.mu.Unlock() + + records, err := s.loadUnlocked() + if err != nil { + return err + } + records = append(records, record) + if len(records) > 200 { + records = records[len(records)-200:] + } + if err := os.MkdirAll(filepath.Dir(s.path), 0o750); err != nil { + return err + } + b, err := json.MarshalIndent(records, "", " ") + if err != nil { + return err + } + return os.WriteFile(s.path, b, 0o640) +} + +func (s *Store) Load() ([]RunRecord, error) { + s.mu.Lock() + defer s.mu.Unlock() + return s.loadUnlocked() +} + +func (s *Store) loadUnlocked() ([]RunRecord, error) { + b, err := os.ReadFile(s.path) + if err != nil { + if os.IsNotExist(err) { + return nil, nil + } + return nil, err + } + if len(b) == 0 { + return nil, nil + } + var records []RunRecord + if err := json.Unmarshal(b, &records); err != nil { + return nil, err + } + return records, nil +} + +func (s *Store) ShutdownP95(defaultSeconds int) int { + records, err := s.Load() + if err != nil { + return defaultSeconds + } + var d []int + for _, r := range records { + if r.Action == "shutdown" && r.Success && r.DurationSeconds > 0 { + d = append(d, r.DurationSeconds) + } + } + if len(d) == 0 { + return defaultSeconds + } + sort.Ints(d) + idx := int(math.Ceil(0.95*float64(len(d)))) - 1 + if idx < 0 { + idx = 0 + } + if idx >= len(d) { + idx = len(d) - 1 + } + if d[idx] <= 0 { + return defaultSeconds + } + return d[idx] +} diff --git a/internal/ups/nut.go b/internal/ups/nut.go new file mode 100644 index 0000000..2835d9c --- /dev/null +++ b/internal/ups/nut.go @@ -0,0 +1,86 @@ +package ups + +import ( + "bufio" + "context" + "fmt" + "os/exec" + "strconv" + "strings" +) + +type Sample struct { + OnBattery bool + LowBattery bool + RuntimeSeconds int + RawStatus string +} + +type Provider interface { + Read(context.Context) (Sample, error) +} + +type NUTProvider struct { + Target string +} + +func NewNUTProvider(target string) *NUTProvider { + return &NUTProvider{Target: target} +} + +func (p *NUTProvider) Read(ctx context.Context) (Sample, error) { + if p.Target == "" { + return Sample{}, fmt.Errorf("NUT target must not be empty") + } + cmd := exec.CommandContext(ctx, "upsc", p.Target) + out, err := cmd.Output() + if err != nil { + return Sample{}, fmt.Errorf("upsc %s: %w", p.Target, err) + } + return parseNUT(string(out)) +} + +func parseNUT(raw string) (Sample, error) { + kv := map[string]string{} + s := bufio.NewScanner(strings.NewReader(raw)) + for s.Scan() { + line := strings.TrimSpace(s.Text()) + if line == "" { + continue + } + parts := strings.SplitN(line, ":", 2) + if len(parts) != 2 { + continue + } + k := strings.TrimSpace(parts[0]) + v := strings.TrimSpace(parts[1]) + kv[k] = v + } + if err := s.Err(); err != nil { + return Sample{}, err + } + + status := kv["ups.status"] + if status == "" { + return Sample{}, fmt.Errorf("ups.status missing in NUT output") + } + flags := strings.Fields(status) + out := Sample{RawStatus: status} + for _, f := range flags { + switch strings.ToUpper(f) { + case "OB": + out.OnBattery = true + case "OL": + out.OnBattery = false + case "LB": + out.LowBattery = true + } + } + if runtimeRaw := kv["battery.runtime"]; runtimeRaw != "" { + runtime, err := strconv.Atoi(strings.Split(runtimeRaw, ".")[0]) + if err == nil { + out.RuntimeSeconds = runtime + } + } + return out, nil +} diff --git a/internal/ups/nut_test.go b/internal/ups/nut_test.go new file mode 100644 index 0000000..5428e13 --- /dev/null +++ b/internal/ups/nut_test.go @@ -0,0 +1,22 @@ +package ups + +import "testing" + +func TestParseNUT(t *testing.T) { + raw := `battery.runtime: 384 +ups.status: OB LB +` + s, err := parseNUT(raw) + if err != nil { + t.Fatalf("parseNUT returned error: %v", err) + } + if !s.OnBattery { + t.Fatalf("expected OnBattery=true") + } + if !s.LowBattery { + t.Fatalf("expected LowBattery=true") + } + if s.RuntimeSeconds != 384 { + t.Fatalf("expected runtime 384, got %d", s.RuntimeSeconds) + } +} diff --git a/scripts/install.sh b/scripts/install.sh new file mode 100755 index 0000000..7a845c0 --- /dev/null +++ b/scripts/install.sh @@ -0,0 +1,65 @@ +#!/usr/bin/env bash +set -euo pipefail + +if [[ "${EUID}" -ne 0 ]]; then + echo "Run as root: sudo ./scripts/install.sh" >&2 + exit 1 +fi + +REPO_DIR="$(cd "$(dirname "${BASH_SOURCE[0]}")/.." && pwd)" +BIN_DIR="/usr/local/bin" +CONF_DIR="/etc/hecate" +STATE_DIR="/var/lib/hecate" +SYSTEMD_DIR="/etc/systemd/system" +START_NOW=1 + +while [[ $# -gt 0 ]]; do + case "$1" in + --no-start) + START_NOW=0 + shift + ;; + *) + echo "Unknown argument: $1" >&2 + exit 1 + ;; + esac +done + +echo "[install] building hecate" +cd "${REPO_DIR}" +mkdir -p dist +go build -o dist/hecate ./cmd/hecate + +echo "[install] installing binary" +install -d -m 0755 "${BIN_DIR}" +install -m 0755 dist/hecate "${BIN_DIR}/hecate" + +echo "[install] installing config + state dirs" +install -d -m 0750 "${CONF_DIR}" +install -d -m 0750 "${STATE_DIR}" +if [[ ! -f "${CONF_DIR}/hecate.yaml" ]]; then + install -m 0640 configs/hecate.example.yaml "${CONF_DIR}/hecate.yaml" + echo "[install] wrote default config to ${CONF_DIR}/hecate.yaml" +else + echo "[install] keeping existing config at ${CONF_DIR}/hecate.yaml" +fi + +echo "[install] installing systemd units" +install -m 0644 deploy/systemd/hecate.service "${SYSTEMD_DIR}/hecate.service" +install -m 0644 deploy/systemd/hecate-bootstrap.service "${SYSTEMD_DIR}/hecate-bootstrap.service" + +systemctl daemon-reload +systemctl enable hecate.service hecate-bootstrap.service + +if [[ "${START_NOW}" -eq 1 ]]; then + systemctl restart hecate.service + echo "[install] hecate.service restarted" +fi + +echo "[install] done" +echo "Next steps:" +echo " 1. Edit /etc/hecate/hecate.yaml" +echo " 2. Run: hecate status --config /etc/hecate/hecate.yaml" +echo " 3. Test dry run: hecate startup --config /etc/hecate/hecate.yaml" +echo " 4. Trigger bootstrap now: systemctl start hecate-bootstrap.service"