ananke/internal/cluster/orchestrator.go

416 lines
13 KiB
Go
Raw Normal View History

package cluster
import (
"context"
"fmt"
"log"
"path/filepath"
"strings"
"time"
"scm.bstein.dev/bstein/hecate/internal/config"
"scm.bstein.dev/bstein/hecate/internal/execx"
"scm.bstein.dev/bstein/hecate/internal/state"
)
type Orchestrator struct {
cfg config.Config
runner *execx.Runner
store *state.Store
log *log.Logger
}
type StartupOptions struct {
ForceFluxBranch string
SkipLocalBootstrap bool
Reason string
}
type ShutdownOptions struct {
SkipEtcdSnapshot bool
SkipDrain bool
Reason string
}
func New(cfg config.Config, runner *execx.Runner, store *state.Store, logger *log.Logger) *Orchestrator {
return &Orchestrator{cfg: cfg, runner: runner, store: store, log: logger}
}
func (o *Orchestrator) Startup(ctx context.Context, opts StartupOptions) (err error) {
unlock, err := state.AcquireLock(o.cfg.State.LockPath)
if err != nil {
return err
}
defer unlock()
record := state.RunRecord{
ID: fmt.Sprintf("startup-%d", time.Now().UnixNano()),
Action: "startup",
Reason: opts.Reason,
StartedAt: time.Now().UTC(),
}
defer o.finalizeRecord(&record, &err)
workers, err := o.effectiveWorkers(ctx)
if err != nil {
return err
}
o.log.Printf("startup control-planes=%s workers=%s", strings.Join(o.cfg.ControlPlanes, ","), strings.Join(workers, ","))
o.reportFluxSource(ctx, opts.ForceFluxBranch)
o.startControlPlanes(ctx, o.cfg.ControlPlanes)
o.startWorkers(ctx, workers)
if err := o.waitForAPI(ctx, 120, 2*time.Second); err != nil {
return err
}
if opts.ForceFluxBranch != "" {
patch := fmt.Sprintf(`{"spec":{"ref":{"branch":"%s"}}}`, opts.ForceFluxBranch)
if _, err := o.kubectl(ctx, 20*time.Second, "-n", "flux-system", "patch", "gitrepository", "flux-system", "--type=merge", "-p", patch); err != nil {
return fmt.Errorf("force flux branch: %w", err)
}
}
if !opts.SkipLocalBootstrap {
ready, readyErr := o.fluxSourceReady(ctx)
if readyErr != nil {
o.log.Printf("warning: unable to read flux source readiness: %v", readyErr)
}
if !ready {
o.log.Printf("flux source not ready, applying local bootstrap path")
if err := o.bootstrapLocal(ctx); err != nil {
return err
}
}
}
if err := o.resumeFluxAndReconcile(ctx); err != nil {
return err
}
o.log.Printf("startup flow complete")
return nil
}
func (o *Orchestrator) Shutdown(ctx context.Context, opts ShutdownOptions) (err error) {
unlock, err := state.AcquireLock(o.cfg.State.LockPath)
if err != nil {
return err
}
defer unlock()
record := state.RunRecord{
ID: fmt.Sprintf("shutdown-%d", time.Now().UnixNano()),
Action: "shutdown",
Reason: opts.Reason,
StartedAt: time.Now().UTC(),
}
defer o.finalizeRecord(&record, &err)
workers, err := o.effectiveWorkers(ctx)
if err != nil {
return err
}
o.log.Printf("shutdown control-planes=%s workers=%s", strings.Join(o.cfg.ControlPlanes, ","), strings.Join(workers, ","))
o.reportFluxSource(ctx, "")
skipEtcd := opts.SkipEtcdSnapshot || o.cfg.Shutdown.SkipEtcdSnapshot
if !skipEtcd {
o.bestEffort("etcd snapshot", func() error {
return o.takeEtcdSnapshot(ctx, o.cfg.ControlPlanes[0])
})
}
o.bestEffort("suspend flux", func() error { return o.patchFluxSuspendAll(ctx, true) })
o.bestEffort("scale down apps", func() error { return o.scaleDownApps(ctx) })
skipDrain := opts.SkipDrain || o.cfg.Shutdown.SkipDrain
if !skipDrain {
o.bestEffort("drain workers", func() error { return o.drainWorkers(ctx, workers) })
}
o.stopWorkers(ctx, workers)
o.stopControlPlanes(ctx, o.cfg.ControlPlanes)
o.log.Printf("shutdown flow complete")
return nil
}
func (o *Orchestrator) EstimatedShutdownSeconds() int {
return o.store.ShutdownP95(o.cfg.Shutdown.DefaultBudgetSeconds)
}
func (o *Orchestrator) finalizeRecord(record *state.RunRecord, err *error) {
record.EndedAt = time.Now().UTC()
record.DurationSeconds = int(record.EndedAt.Sub(record.StartedAt).Seconds())
record.Success = *err == nil
if *err != nil {
record.Error = (*err).Error()
}
if appendErr := o.store.Append(*record); appendErr != nil {
o.log.Printf("warning: append run record failed: %v", appendErr)
}
}
func (o *Orchestrator) effectiveWorkers(ctx context.Context) ([]string, error) {
if len(o.cfg.Workers) > 0 {
return append([]string{}, o.cfg.Workers...), nil
}
return o.discoverWorkers(ctx)
}
func (o *Orchestrator) discoverWorkers(ctx context.Context) ([]string, error) {
out, err := o.kubectl(ctx, 15*time.Second,
"get", "nodes",
"-o", "custom-columns=NAME:.metadata.name,CP:.metadata.labels.node-role\\.kubernetes\\.io/control-plane,MASTER:.metadata.labels.node-role\\.kubernetes\\.io/master",
"--no-headers",
)
if err != nil {
return nil, fmt.Errorf("discover workers: %w", err)
}
var workers []string
for _, line := range lines(out) {
fields := strings.Fields(line)
if len(fields) < 3 {
continue
}
if fields[1] == "<none>" && fields[2] == "<none>" {
workers = append(workers, fields[0])
}
}
if len(workers) == 0 {
return nil, fmt.Errorf("no workers discovered")
}
return workers, nil
}
func (o *Orchestrator) patchFluxSuspendAll(ctx context.Context, suspend bool) error {
patch := fmt.Sprintf(`{"spec":{"suspend":%t}}`, suspend)
ksOut, err := o.kubectl(ctx, 20*time.Second,
"-n", "flux-system", "get", "kustomizations.kustomize.toolkit.fluxcd.io",
"-o", "jsonpath={range .items[*]}{.metadata.name}{'\\n'}{end}",
)
if err != nil {
return err
}
for _, ks := range lines(ksOut) {
_, patchErr := o.kubectl(ctx, 20*time.Second, "-n", "flux-system", "patch", "kustomization", ks, "--type=merge", "-p", patch)
if patchErr != nil {
o.log.Printf("warning: patch kustomization %s failed: %v", ks, patchErr)
}
}
hrOut, err := o.kubectl(ctx, 25*time.Second,
"get", "helmreleases.helm.toolkit.fluxcd.io", "-A",
"-o", "jsonpath={range .items[*]}{.metadata.namespace}{'/'}{.metadata.name}{'\\n'}{end}",
)
if err != nil {
return err
}
for _, hr := range lines(hrOut) {
parts := strings.SplitN(hr, "/", 2)
if len(parts) != 2 {
continue
}
_, patchErr := o.kubectl(ctx, 20*time.Second, "-n", parts[0], "patch", "helmrelease", parts[1], "--type=merge", "-p", patch)
if patchErr != nil {
o.log.Printf("warning: patch helmrelease %s failed: %v", hr, patchErr)
}
}
return nil
}
func (o *Orchestrator) scaleDownApps(ctx context.Context) error {
nsOut, err := o.kubectl(ctx, 15*time.Second, "get", "ns", "-o", "jsonpath={range .items[*]}{.metadata.name}{'\\n'}{end}")
if err != nil {
return err
}
exclude := map[string]struct{}{}
for _, ns := range o.cfg.ExcludedNamespaces {
exclude[ns] = struct{}{}
}
for _, ns := range lines(nsOut) {
if _, ok := exclude[ns]; ok {
continue
}
if _, scaleErr := o.kubectl(ctx, 15*time.Second, "-n", ns, "scale", "deployment", "--all", "--replicas=0"); scaleErr != nil {
o.log.Printf("warning: scale deployments in %s failed: %v", ns, scaleErr)
}
if _, scaleErr := o.kubectl(ctx, 15*time.Second, "-n", ns, "scale", "statefulset", "--all", "--replicas=0"); scaleErr != nil {
o.log.Printf("warning: scale statefulsets in %s failed: %v", ns, scaleErr)
}
}
return nil
}
func (o *Orchestrator) drainWorkers(ctx context.Context, workers []string) error {
for _, node := range workers {
if _, err := o.kubectl(ctx, 20*time.Second, "cordon", node); err != nil {
o.log.Printf("warning: cordon %s failed: %v", node, err)
}
if _, err := o.kubectl(ctx, 3*time.Minute, "drain", node, "--ignore-daemonsets", "--delete-emptydir-data", "--grace-period=30", "--timeout=180s"); err != nil {
o.log.Printf("warning: drain %s failed: %v", node, err)
}
}
return nil
}
func (o *Orchestrator) stopWorkers(ctx context.Context, workers []string) {
for _, n := range workers {
o.bestEffort("stop k3s-agent on "+n, func() error {
_, err := o.ssh(ctx, n, "sudo systemctl stop k3s-agent || true")
return err
})
}
}
func (o *Orchestrator) startWorkers(ctx context.Context, workers []string) {
for _, n := range workers {
o.bestEffort("start k3s-agent on "+n, func() error {
_, err := o.ssh(ctx, n, "sudo systemctl start k3s-agent || true")
return err
})
}
}
func (o *Orchestrator) stopControlPlanes(ctx context.Context, cps []string) {
for _, n := range cps {
o.bestEffort("stop k3s on "+n, func() error {
_, err := o.ssh(ctx, n, "sudo systemctl stop k3s || true")
return err
})
}
}
func (o *Orchestrator) startControlPlanes(ctx context.Context, cps []string) {
for _, n := range cps {
o.bestEffort("start k3s on "+n, func() error {
_, err := o.ssh(ctx, n, "sudo systemctl start k3s || true")
return err
})
}
}
func (o *Orchestrator) takeEtcdSnapshot(ctx context.Context, node string) error {
name := "pre-shutdown-" + time.Now().UTC().Format("20060102-150405")
_, err := o.ssh(ctx, node, "sudo k3s etcd-snapshot save --name "+name)
return err
}
func (o *Orchestrator) waitForAPI(ctx context.Context, attempts int, sleep time.Duration) error {
if o.runner.DryRun {
return nil
}
for i := 0; i < attempts; i++ {
_, err := o.kubectl(ctx, 5*time.Second, "version", "--request-timeout=5s")
if err == nil {
return nil
}
time.Sleep(sleep)
}
return fmt.Errorf("kubernetes API did not become reachable within timeout")
}
func (o *Orchestrator) fluxSourceReady(ctx context.Context) (bool, error) {
out, err := o.kubectl(ctx, 10*time.Second, "-n", "flux-system", "get", "gitrepository", "flux-system", "-o", "jsonpath={.status.conditions[?(@.type==\"Ready\")].status}")
if err != nil {
return false, err
}
return strings.Contains(out, "True"), nil
}
func (o *Orchestrator) reportFluxSource(ctx context.Context, forceBranch string) {
urlOut, urlErr := o.kubectl(ctx, 10*time.Second, "-n", "flux-system", "get", "gitrepository", "flux-system", "-o", "jsonpath={.spec.url}")
if urlErr == nil {
o.log.Printf("flux-source-url=%s", strings.TrimSpace(urlOut))
}
branchOut, branchErr := o.kubectl(ctx, 10*time.Second, "-n", "flux-system", "get", "gitrepository", "flux-system", "-o", "jsonpath={.spec.ref.branch}")
if branchErr == nil {
branch := strings.TrimSpace(branchOut)
o.log.Printf("flux-source-branch=%s", branch)
if forceBranch == "" && branch != o.cfg.ExpectedFluxBranch {
o.log.Printf("warning: flux source branch is '%s', expected '%s'", branch, o.cfg.ExpectedFluxBranch)
}
}
}
func (o *Orchestrator) bootstrapLocal(ctx context.Context) error {
for _, rel := range o.cfg.LocalBootstrapPaths {
full := filepath.Join(o.cfg.IACRepoPath, rel)
o.log.Printf("local bootstrap apply -k %s", full)
if _, err := o.kubectl(ctx, 2*time.Minute, "apply", "-k", full); err != nil {
return fmt.Errorf("local bootstrap apply failed at %s: %w", full, err)
}
}
return nil
}
func (o *Orchestrator) resumeFluxAndReconcile(ctx context.Context) error {
if err := o.patchFluxSuspendAll(ctx, false); err != nil {
return err
}
if o.runner.CommandExists("flux") {
commands := [][]string{
{"reconcile", "source", "git", "flux-system", "-n", "flux-system", "--timeout=3m"},
{"reconcile", "kustomization", "core", "-n", "flux-system", "--with-source", "--timeout=5m"},
{"reconcile", "kustomization", "helm", "-n", "flux-system", "--with-source", "--timeout=5m"},
{"reconcile", "kustomization", "traefik", "-n", "flux-system", "--with-source", "--timeout=5m"},
{"reconcile", "kustomization", "vault", "-n", "flux-system", "--with-source", "--timeout=10m"},
{"reconcile", "kustomization", "postgres", "-n", "flux-system", "--with-source", "--timeout=10m"},
{"reconcile", "kustomization", "gitea", "-n", "flux-system", "--with-source", "--timeout=10m"},
}
for _, c := range commands {
if _, err := o.run(ctx, 3*time.Minute, "flux", c...); err != nil {
o.log.Printf("warning: flux command failed (%s): %v", strings.Join(c, " "), err)
}
}
return nil
}
now := time.Now().UTC().Format(time.RFC3339)
_, err := o.kubectl(ctx, 20*time.Second, "-n", "flux-system", "annotate", "kustomizations.kustomize.toolkit.fluxcd.io", "--all", "reconcile.fluxcd.io/requestedAt="+now, "--overwrite")
return err
}
func (o *Orchestrator) kubectl(ctx context.Context, timeout time.Duration, args ...string) (string, error) {
return o.run(ctx, timeout, "kubectl", args...)
}
func (o *Orchestrator) ssh(ctx context.Context, node string, command string) (string, error) {
target := node
if o.cfg.SSHUser != "" {
target = o.cfg.SSHUser + "@" + node
}
return o.run(ctx, 45*time.Second, "ssh", "-o", "BatchMode=yes", "-o", "ConnectTimeout=8", target, command)
}
func (o *Orchestrator) run(ctx context.Context, timeout time.Duration, name string, args ...string) (string, error) {
runCtx, cancel := context.WithTimeout(ctx, timeout)
defer cancel()
return o.runner.Run(runCtx, name, args...)
}
func lines(in string) []string {
in = strings.TrimSpace(in)
if in == "" {
return nil
}
parts := strings.Split(in, "\n")
out := make([]string, 0, len(parts))
for _, p := range parts {
v := strings.TrimSpace(p)
if v != "" {
out = append(out, v)
}
}
return out
}
func (o *Orchestrator) bestEffort(name string, fn func() error) {
if err := fn(); err != nil {
o.log.Printf("warning: %s: %v", name, err)
}
}