ananke/internal/cluster/orchestrator_coordination.go

276 lines
11 KiB
Go

package cluster
import (
"context"
"fmt"
"path/filepath"
"strconv"
"strings"
"time"
"scm.bstein.dev/bstein/ananke/internal/state"
)
// intentAge runs one orchestration or CLI step.
// Signature: intentAge(in state.Intent) time.Duration.
// Why: keeps behavior explicit so startup/shutdown workflows remain maintainable as services evolve.
func intentAge(in state.Intent) time.Duration {
if in.UpdatedAt.IsZero() {
return 0
}
return time.Since(in.UpdatedAt)
}
// intentFresh runs one orchestration or CLI step.
// Signature: intentFresh(in state.Intent, maxAge time.Duration) bool.
// Why: keeps behavior explicit so startup/shutdown workflows remain maintainable as services evolve.
func intentFresh(in state.Intent, maxAge time.Duration) bool {
if in.UpdatedAt.IsZero() {
return true
}
return intentAge(in) <= maxAge
}
// startupGuardAge runs one orchestration or CLI step.
// Signature: (o *Orchestrator) startupGuardAge() time.Duration.
// Why: keeps behavior explicit so startup/shutdown workflows remain maintainable as services evolve.
func (o *Orchestrator) startupGuardAge() time.Duration {
seconds := o.cfg.Coordination.StartupGuardMaxAgeSec
if seconds <= 0 {
seconds = 900
}
return time.Duration(seconds) * time.Second
}
// startupShutdownCooldown runs one orchestration or CLI step.
// Signature: (o *Orchestrator) startupShutdownCooldown() time.Duration.
// Why: keeps behavior explicit so startup/shutdown workflows remain maintainable as services evolve.
func (o *Orchestrator) startupShutdownCooldown() time.Duration {
seconds := o.cfg.Startup.ShutdownCooldownSeconds
if seconds <= 0 {
seconds = 45
}
return time.Duration(seconds) * time.Second
}
// coordinationPeers runs one orchestration or CLI step.
// Signature: (o *Orchestrator) coordinationPeers() []string.
// Why: keeps behavior explicit so startup/shutdown workflows remain maintainable as services evolve.
func (o *Orchestrator) coordinationPeers() []string {
seen := map[string]struct{}{}
out := make([]string, 0, len(o.cfg.Coordination.PeerHosts)+1)
add := func(node string) {
node = strings.TrimSpace(node)
if node == "" {
return
}
if _, ok := seen[node]; ok {
return
}
seen[node] = struct{}{}
out = append(out, node)
}
for _, node := range o.cfg.Coordination.PeerHosts {
add(node)
}
if strings.TrimSpace(o.cfg.Coordination.ForwardShutdownHost) != "" {
add(o.cfg.Coordination.ForwardShutdownHost)
}
return out
}
// guardPeerStartupIntents runs one orchestration or CLI step.
// Signature: (o *Orchestrator) guardPeerStartupIntents(ctx context.Context) error.
// Why: keeps behavior explicit so startup/shutdown workflows remain maintainable as services evolve.
func (o *Orchestrator) guardPeerStartupIntents(ctx context.Context) error {
peers := o.coordinationPeers()
if len(peers) == 0 {
return nil
}
guardAge := o.startupGuardAge()
localRole := strings.ToLower(strings.TrimSpace(o.cfg.Coordination.Role))
for _, peer := range peers {
peerStatus, err := o.readRemotePeerStatus(ctx, peer)
if err != nil {
o.log.Printf("warning: peer startup guard skipped intent check for %s: %v", peer, err)
continue
}
intent := peerStatus.Intent
switch intent.State {
case "", state.IntentNormal:
continue
case state.IntentShuttingDown:
if intentFresh(intent, guardAge) {
return fmt.Errorf("startup blocked: peer %s has active shutdown intent (reason=%q age=%s)", peer, intent.Reason, intentAge(intent).Round(time.Second))
}
o.log.Printf("warning: peer %s shutdown intent appears stale; allowing startup", peer)
case state.IntentStartupInProgress:
if !peerStatus.BootstrapActive {
o.log.Printf("warning: peer %s reports startup_in_progress but bootstrap service is inactive (reason=%q age=%s); auto-clearing stale peer intent",
peer, intent.Reason, intentAge(intent).Round(time.Second))
o.bestEffort(fmt.Sprintf("clear stale peer startup intent on %s", peer), func() error {
return o.clearRemotePeerIntent(ctx, peer, "auto-clear stale peer startup intent")
})
continue
}
if localRole == "coordinator" && strings.EqualFold(strings.TrimSpace(intent.Reason), "manual-startup") {
o.log.Printf("warning: peer %s has manual startup in progress (age=%s); allowing coordinator startup to continue",
peer, intentAge(intent).Round(time.Second))
continue
}
if intentFresh(intent, guardAge) {
return fmt.Errorf("startup blocked: peer %s reports startup_in_progress (reason=%q age=%s)", peer, intent.Reason, intentAge(intent).Round(time.Second))
}
o.log.Printf("warning: peer %s startup intent appears stale; auto-clearing and allowing startup", peer)
o.bestEffort(fmt.Sprintf("clear stale peer startup intent on %s", peer), func() error {
return o.clearRemotePeerIntent(ctx, peer, "auto-clear stale peer startup intent")
})
case state.IntentShutdownComplete:
if intentFresh(intent, o.startupShutdownCooldown()) {
return fmt.Errorf("startup blocked: peer %s completed shutdown too recently (age=%s)", peer, intentAge(intent).Round(time.Second))
}
default:
o.log.Printf("warning: peer %s intent state %q is unknown; ignoring", peer, intent.State)
}
}
return nil
}
// readRemotePeerStatus runs one orchestration or CLI step.
// Signature: (o *Orchestrator) readRemotePeerStatus(ctx context.Context, node string) (remotePeerStatus, error).
// Why: keeps behavior explicit so startup/shutdown workflows remain maintainable as services evolve.
func (o *Orchestrator) readRemotePeerStatus(ctx context.Context, node string) (remotePeerStatus, error) {
if !o.sshManaged(node) {
return remotePeerStatus{}, fmt.Errorf("%s is not in ssh_managed_nodes", node)
}
out, err := o.ssh(ctx, node, "if sudo -n /usr/bin/systemctl is-active --quiet ananke-bootstrap.service; then echo __ANANKE_BOOTSTRAP_ACTIVE__; else echo __ANANKE_BOOTSTRAP_IDLE__; fi; sudo -n /usr/local/bin/ananke intent --config /etc/ananke/ananke.yaml")
if err != nil {
return remotePeerStatus{}, err
}
status := remotePeerStatus{
BootstrapActive: strings.Contains(out, "__ANANKE_BOOTSTRAP_ACTIVE__"),
}
in, err := state.ParseIntentOutput(out)
if err != nil {
return remotePeerStatus{}, fmt.Errorf("parse remote intent output: %w", err)
}
status.Intent = in
return status, nil
}
// clearRemotePeerIntent runs one orchestration or CLI step.
// Signature: (o *Orchestrator) clearRemotePeerIntent(ctx context.Context, node string, reason string) error.
// Why: keeps behavior explicit so startup/shutdown workflows remain maintainable as services evolve.
func (o *Orchestrator) clearRemotePeerIntent(ctx context.Context, node string, reason string) error {
cmd := fmt.Sprintf(
"sudo -n /usr/local/bin/ananke intent --config /etc/ananke/ananke.yaml --set normal --reason %s --source startup --execute",
shellQuote(reason),
)
_, err := o.ssh(ctx, node, cmd)
return err
}
// shellQuote runs one orchestration or CLI step.
// Signature: shellQuote(v string) string.
// Why: keeps behavior explicit so startup/shutdown workflows remain maintainable as services evolve.
func shellQuote(v string) string {
return "'" + strings.ReplaceAll(v, "'", `'"'"'`) + "'"
}
// verifyEtcdSnapshot runs one orchestration or CLI step.
// Signature: (o *Orchestrator) verifyEtcdSnapshot(ctx context.Context, node string, snapshotPath string) error.
// Why: keeps behavior explicit so startup/shutdown workflows remain maintainable as services evolve.
func (o *Orchestrator) verifyEtcdSnapshot(ctx context.Context, node string, snapshotPath string) error {
if o.runner.DryRun {
return nil
}
path := strings.TrimSpace(snapshotPath)
if path == "" {
return fmt.Errorf("etcd snapshot verification failed: snapshot path is empty")
}
quoted := shellQuote(path)
sizeOut, err := o.ssh(ctx, node, fmt.Sprintf("sudo -n sh -lc 'test -s %s && stat -c %%s %s'", quoted, quoted))
if err != nil {
return fmt.Errorf("etcd snapshot verification failed for %s on %s: %w", path, node, err)
}
size, convErr := strconv.ParseInt(strings.TrimSpace(sizeOut), 10, 64)
if convErr != nil {
return fmt.Errorf("etcd snapshot verification failed for %s on %s: parse size %q: %w", path, node, strings.TrimSpace(sizeOut), convErr)
}
const minSnapshotBytes = int64(1 << 20) // 1 MiB sanity floor.
if size < minSnapshotBytes {
return fmt.Errorf("etcd snapshot verification failed for %s on %s: snapshot too small (%d bytes)", path, node, size)
}
lsOut, err := o.runSudoK3S(ctx, node, "etcd-snapshot", "ls")
if err != nil {
return fmt.Errorf("etcd snapshot verification failed for %s on %s: list snapshots: %w", path, node, err)
}
if !strings.Contains(lsOut, path) && !strings.Contains(lsOut, filepath.Base(path)) {
return fmt.Errorf("etcd snapshot verification failed for %s on %s: snapshot is not present in k3s etcd-snapshot ls output", path, node)
}
sumOut, err := o.ssh(ctx, node, fmt.Sprintf("sudo -n sh -lc 'sha256sum %s | awk \"{print \\$1}\"'", quoted))
if err != nil {
return fmt.Errorf("etcd snapshot verification failed for %s on %s: sha256: %w", path, node, err)
}
hash := strings.TrimSpace(sumOut)
if len(hash) != 64 {
return fmt.Errorf("etcd snapshot verification failed for %s on %s: invalid sha256 %q", path, node, hash)
}
o.log.Printf("etcd snapshot verified path=%s size_bytes=%d sha256=%s", path, size, hash[:12])
return nil
}
// runSudoK3S runs one orchestration or CLI step.
// Signature: (o *Orchestrator) runSudoK3S(ctx context.Context, node string, args ...string) (string, error).
// Why: keeps behavior explicit so startup/shutdown workflows remain maintainable as services evolve.
func (o *Orchestrator) runSudoK3S(ctx context.Context, node string, args ...string) (string, error) {
k3sPaths := []string{
"/usr/local/bin/k3s",
"/usr/bin/k3s",
"k3s",
}
var lastErr error
for _, path := range k3sPaths {
parts := []string{"sudo", "-n", path}
parts = append(parts, args...)
command := strings.Join(parts, " ")
out, err := o.ssh(ctx, node, command)
if err == nil {
return out, nil
}
lastErr = err
}
if lastErr == nil {
lastErr = fmt.Errorf("no k3s executable candidates configured")
}
return "", lastErr
}
// controlPlaneUsesExternalDatastore runs one orchestration or CLI step.
// Signature: (o *Orchestrator) controlPlaneUsesExternalDatastore(ctx context.Context, node string) (bool, error).
// Why: keeps behavior explicit so startup/shutdown workflows remain maintainable as services evolve.
func (o *Orchestrator) controlPlaneUsesExternalDatastore(ctx context.Context, node string) (bool, error) {
out, err := o.ssh(ctx, node, "sudo systemctl cat k3s")
if err != nil {
return false, fmt.Errorf("inspect k3s service on %s for datastore mode: %w", node, err)
}
return strings.Contains(out, "--datastore-endpoint="), nil
}
// waitForAPI runs one orchestration or CLI step.
// Signature: (o *Orchestrator) waitForAPI(ctx context.Context, attempts int, sleep time.Duration) error.
// Why: keeps behavior explicit so startup/shutdown workflows remain maintainable as services evolve.
func (o *Orchestrator) waitForAPI(ctx context.Context, attempts int, sleep time.Duration) error {
if o.runner.DryRun {
return nil
}
for i := 0; i < attempts; i++ {
_, err := o.kubectl(ctx, 5*time.Second, "version", "--request-timeout=5s")
if err == nil {
return nil
}
time.Sleep(sleep)
}
return fmt.Errorf("kubernetes API did not become reachable within timeout")
}