ananke/testing/orchestrator/hooks_gap_matrix_part10_test.go

561 lines
27 KiB
Go

package orchestrator
import (
"context"
"encoding/base64"
"errors"
"io"
"log"
"net"
"os"
"path/filepath"
"strings"
"testing"
"time"
"scm.bstein.dev/bstein/ananke/internal/cluster"
"scm.bstein.dev/bstein/ananke/internal/config"
"scm.bstein.dev/bstein/ananke/internal/execx"
"scm.bstein.dev/bstein/ananke/internal/state"
)
// newHookOrchestratorAdvanced runs one orchestration or CLI step.
// Signature: newHookOrchestratorAdvanced(t *testing.T, cfg config.Config, dryRun bool, run commandOverride, runSensitive commandOverride) (*cluster.Orchestrator, *commandRecorder).
// Why: this part10 matrix needs dry-run and non-dry-run variants while keeping
// command dispatch deterministic from the top-level testing module.
func newHookOrchestratorAdvanced(
t *testing.T,
cfg config.Config,
dryRun bool,
run func(context.Context, time.Duration, string, ...string) (string, error),
runSensitive func(context.Context, time.Duration, string, ...string) (string, error),
) (*cluster.Orchestrator, *commandRecorder) {
t.Helper()
if err := os.MkdirAll(cfg.State.Dir, 0o755); err != nil {
t.Fatalf("ensure state dir: %v", err)
}
recorder := &commandRecorder{}
if run == nil || runSensitive == nil {
base := lifecycleDispatcher(recorder)
if run == nil {
run = base
}
if runSensitive == nil {
runSensitive = base
}
}
orch := cluster.New(cfg, &execx.Runner{DryRun: dryRun}, state.New(cfg.State.RunHistoryPath), log.New(io.Discard, "", 0))
orch.SetCommandOverrides(run, runSensitive)
return orch, recorder
}
// TestHookGapMatrixPart10LowFileClosure runs one orchestration or CLI step.
// Signature: TestHookGapMatrixPart10LowFileClosure(t *testing.T).
// Why: closes remaining branch gaps on low-coverage orchestrator files using
// targeted hook-level scenarios instead of brittle full-drill reruns.
func TestHookGapMatrixPart10LowFileClosure(t *testing.T) {
t.Run("critical-vault-low-branches", func(t *testing.T) {
t.Run("vault-sealed-parse-error", func(t *testing.T) {
cfg := lifecycleConfig(t)
run := func(ctx context.Context, timeout time.Duration, name string, args ...string) (string, error) {
command := name + " " + strings.Join(args, " ")
if name == "kubectl" && strings.Contains(command, "vault status -format=json") {
return "{invalid-json", nil
}
return lifecycleDispatcher(&commandRecorder{})(ctx, timeout, name, args...)
}
orch, _ := newHookOrchestratorAdvanced(t, cfg, false, run, run)
if _, err := orch.TestHookVaultSealed(context.Background()); err == nil || !strings.Contains(err.Error(), "parse vault status") {
t.Fatalf("expected vault status parse error branch, got %v", err)
}
})
t.Run("vault-unseal-key-empty-decoded", func(t *testing.T) {
cfg := lifecycleConfig(t)
cfg.Startup.VaultUnsealBreakglassCommand = ""
encodedEmpty := base64.StdEncoding.EncodeToString([]byte(" "))
run := func(ctx context.Context, timeout time.Duration, name string, args ...string) (string, error) {
command := name + " " + strings.Join(args, " ")
if name == "kubectl" && strings.Contains(command, "get secret vault-init") {
return encodedEmpty, nil
}
return lifecycleDispatcher(&commandRecorder{})(ctx, timeout, name, args...)
}
orch, _ := newHookOrchestratorAdvanced(t, cfg, false, run, run)
if _, err := orch.TestHookVaultUnsealKey(context.Background()); err == nil || !strings.Contains(err.Error(), "vault-init unseal key is empty") {
t.Fatalf("expected empty decoded unseal key branch, got %v", err)
}
})
t.Run("write-unseal-key-file-write-error", func(t *testing.T) {
cfg := lifecycleConfig(t)
cfg.Startup.VaultUnsealKeyFile = t.TempDir()
orch, _ := newHookOrchestratorAdvanced(t, cfg, false, nil, nil)
if err := orch.TestHookWriteVaultUnsealKeyFile("vault-key"); err == nil || !strings.Contains(err.Error(), "write vault unseal key file") {
t.Fatalf("expected write failure branch when key path is a directory, got %v", err)
}
})
t.Run("workload-ready-no-value-and-ensure-error", func(t *testing.T) {
cfg := lifecycleConfig(t)
runNoValue := func(ctx context.Context, timeout time.Duration, name string, args ...string) (string, error) {
command := name + " " + strings.Join(args, " ")
if name == "kubectl" && strings.Contains(command, "jsonpath={.status.readyReplicas}") {
return "<no value>", nil
}
return lifecycleDispatcher(&commandRecorder{})(ctx, timeout, name, args...)
}
orchNoValue, _ := newHookOrchestratorAdvanced(t, cfg, false, runNoValue, runNoValue)
ready, err := orchNoValue.TestHookWorkloadReady(context.Background(), "vault", "statefulset", "vault")
if err != nil || ready {
t.Fatalf("expected no-value readiness branch, ready=%v err=%v", ready, err)
}
runEnsureErr := func(ctx context.Context, timeout time.Duration, name string, args ...string) (string, error) {
command := name + " " + strings.Join(args, " ")
switch {
case name == "kubectl" && strings.Contains(command, " scale "):
return "", nil
case name == "kubectl" && strings.Contains(command, "get pods -o custom-columns"):
return "", nil
case name == "kubectl" && strings.Contains(command, "rollout status statefulset/victoria-metrics-single-server"):
return "", errors.New("rollout failed")
default:
return lifecycleDispatcher(&commandRecorder{})(ctx, timeout, name, args...)
}
}
orchEnsureErr, _ := newHookOrchestratorAdvanced(t, cfg, false, runEnsureErr, runEnsureErr)
if err := orchEnsureErr.TestHookEnsureCriticalStartupWorkloads(context.Background()); err == nil || !strings.Contains(err.Error(), "rollout failed") {
t.Fatalf("expected ensureCriticalStartupWorkloads wait error branch, got %v", err)
}
})
t.Run("ensure-vault-unsealed-phase-and-followup-errors", func(t *testing.T) {
cfgPhase := lifecycleConfig(t)
runPhase := func(ctx context.Context, timeout time.Duration, name string, args ...string) (string, error) {
command := name + " " + strings.Join(args, " ")
if name == "kubectl" && strings.Contains(command, "get pod vault-0") {
return "Pending", nil
}
return lifecycleDispatcher(&commandRecorder{})(ctx, timeout, name, args...)
}
orchPhase, _ := newHookOrchestratorAdvanced(t, cfgPhase, false, runPhase, runPhase)
if err := orchPhase.TestHookEnsureVaultUnsealed(context.Background()); err == nil || !strings.Contains(err.Error(), "pod phase") {
t.Fatalf("expected pod phase guard branch, got %v", err)
}
cfgFollowup := lifecycleConfig(t)
sealedCalls := 0
runFollowup := func(ctx context.Context, timeout time.Duration, name string, args ...string) (string, error) {
command := name + " " + strings.Join(args, " ")
switch {
case name == "kubectl" && strings.Contains(command, "get pod vault-0"):
return "Running", nil
case name == "kubectl" && strings.Contains(command, "get secret vault-init"):
return base64.StdEncoding.EncodeToString([]byte("vault-key")), nil
case name == "kubectl" && strings.Contains(command, "vault status -format=json"):
sealedCalls++
if sealedCalls == 1 {
return `{"sealed":true}`, nil
}
return "", errors.New("vault status failed")
default:
return lifecycleDispatcher(&commandRecorder{})(ctx, timeout, name, args...)
}
}
runSensitive := func(ctx context.Context, timeout time.Duration, name string, args ...string) (string, error) {
command := name + " " + strings.Join(args, " ")
if name == "kubectl" && strings.Contains(command, "vault operator unseal") {
return "ok", nil
}
return runFollowup(ctx, timeout, name, args...)
}
orchFollowup, _ := newHookOrchestratorAdvanced(t, cfgFollowup, false, runFollowup, runSensitive)
if err := orchFollowup.TestHookEnsureVaultUnsealed(context.Background()); err == nil || !strings.Contains(err.Error(), "vault status check failed") {
t.Fatalf("expected follow-up sealed status error branch, got %v", err)
}
})
})
t.Run("drain-and-scaling-low-branches", func(t *testing.T) {
t.Run("drain-workers-error-aggregation-and-diagnostics", func(t *testing.T) {
cfg := lifecycleConfig(t)
cfg.Shutdown.DrainParallelism = 0
workers := []string{"w1", "w2", "w3", "w4", "w5"}
run := func(ctx context.Context, timeout time.Duration, name string, args ...string) (string, error) {
command := name + " " + strings.Join(args, " ")
switch {
case name == "kubectl" && strings.Contains(command, "cordon "):
return "", errors.New("cordon denied")
case name == "kubectl" && strings.Contains(command, "drain "):
return "", errors.New("drain denied")
case name == "kubectl" && strings.Contains(command, "get pods -A --field-selector"):
return strings.Join([]string{
"malformed",
"ns p1 Running Deployment",
"ns p2 Running Deployment",
"ns p3 Running Deployment",
"ns p4 Running Deployment",
"ns p5 Running Deployment",
"ns p6 Running Deployment",
"ns p7 Running Deployment",
}, "\n"), nil
default:
return lifecycleDispatcher(&commandRecorder{})(ctx, timeout, name, args...)
}
}
orch, _ := newHookOrchestratorAdvanced(t, cfg, false, run, run)
err := orch.TestHookDrainWorkers(context.Background(), workers)
if err == nil || !strings.Contains(err.Error(), "drain workers had 5 errors") {
t.Fatalf("expected drain aggregation branch, got %v", err)
}
})
t.Run("run-ssh-across-nodes-clamp-and-skip", func(t *testing.T) {
cfg := lifecycleConfig(t)
cfg.Shutdown.SSHParallelism = 99
cfg.SSHManagedNodes = []string{"titan-db"}
rec := &commandRecorder{}
base := lifecycleDispatcher(rec)
orch, _ := newHookOrchestratorAdvanced(t, cfg, false, base, base)
orch.TestHookRunSSHAcrossNodes(context.Background(), []string{"titan-db", "not-managed"}, "noop", "echo ok")
if !rec.contains("atlas@titan-db echo ok") {
t.Fatalf("expected managed ssh execution branch")
}
})
t.Run("latest-etcd-snapshot-empty-list", func(t *testing.T) {
cfg := lifecycleConfig(t)
run := func(ctx context.Context, timeout time.Duration, name string, args ...string) (string, error) {
command := name + " " + strings.Join(args, " ")
if name == "ssh" && strings.Contains(command, "etcd-snapshot ls") {
return "NAME LOCATION", nil
}
return lifecycleDispatcher(&commandRecorder{})(ctx, timeout, name, args...)
}
orch, _ := newHookOrchestratorAdvanced(t, cfg, false, run, run)
if _, err := orch.TestHookLatestEtcdSnapshotPath(context.Background(), "titan-db"); err == nil || !strings.Contains(err.Error(), "no etcd snapshots found") {
t.Fatalf("expected empty snapshot-list branch, got %v", err)
}
})
t.Run("scaling-fallback-and-snapshot-writer-errors", func(t *testing.T) {
cfg := lifecycleConfig(t)
cfg.Workers = nil
cfg.SSHManagedNodes = []string{"titan-db", "titan-23", " "}
cfg.ControlPlanes = []string{"titan-db"}
runWorkers := func(ctx context.Context, timeout time.Duration, name string, args ...string) (string, error) {
if name == "kubectl" && len(args) >= 2 && args[0] == "get" && args[1] == "nodes" {
return "", errors.New("api down")
}
return lifecycleDispatcher(&commandRecorder{})(ctx, timeout, name, args...)
}
orchWorkers, _ := newHookOrchestratorAdvanced(t, cfg, false, runWorkers, runWorkers)
workers, err := orchWorkers.TestHookEffectiveWorkers(context.Background())
if err != nil || len(workers) == 0 {
t.Fatalf("expected inventory worker fallback branch, workers=%v err=%v", workers, err)
}
cfgWrite := lifecycleConfig(t)
cfgWrite.State.Dir = t.TempDir()
// Force write failure by making the snapshot output path a directory.
if err := os.MkdirAll(filepath.Join(cfgWrite.State.Dir, "scaled-workloads.json"), 0o755); err != nil {
t.Fatalf("create snapshot blocker directory: %v", err)
}
runWrite := func(ctx context.Context, timeout time.Duration, name string, args ...string) (string, error) {
command := name + " " + strings.Join(args, " ")
switch {
case name == "kubectl" && strings.Contains(command, "get deployment -A -o jsonpath="):
return "monitoring\tgrafana\t1\n", nil
case name == "kubectl" && strings.Contains(command, "get statefulset -A -o jsonpath="):
return "", nil
default:
return lifecycleDispatcher(&commandRecorder{})(ctx, timeout, name, args...)
}
}
orchWrite, _ := newHookOrchestratorAdvanced(t, cfgWrite, false, runWrite, runWrite)
if err := orchWrite.TestHookScaleDownApps(context.Background()); err == nil || !strings.Contains(err.Error(), "write scaled workload snapshot") {
t.Fatalf("expected scaled snapshot write-failure branch, got %v", err)
}
})
})
t.Run("flux-ingress-service-and-inventory-low-branches", func(t *testing.T) {
t.Run("flux-health-ready-reason-fallback-and-heal-query-error", func(t *testing.T) {
cfg := lifecycleConfig(t)
runReady := func(ctx context.Context, timeout time.Duration, name string, args ...string) (string, error) {
command := name + " " + strings.Join(args, " ")
switch {
case name == "kubectl" && strings.Contains(command, "get kustomizations.kustomize.toolkit.fluxcd.io -A -o json"):
return `{"items":[{"metadata":{"namespace":"","name":"skip"},"spec":{"suspend":false},"status":{"conditions":[]}},{"metadata":{"namespace":"flux-system","name":"apps"},"spec":{"suspend":false},"status":{"conditions":[{"type":"Ready","status":"False","reason":"","message":""}]}}]}`, nil
case name == "kubectl" && strings.Contains(command, "get jobs -A -o json"):
return "", errors.New("jobs query failed")
default:
return lifecycleDispatcher(&commandRecorder{})(ctx, timeout, name, args...)
}
}
orchReady, _ := newHookOrchestratorAdvanced(t, cfg, false, runReady, runReady)
ready, detail, err := orchReady.TestHookFluxHealthReady(context.Background())
if err != nil || ready || !strings.Contains(detail, "ready=false") {
t.Fatalf("expected flux ready-reason fallback branch, ready=%v detail=%q err=%v", ready, detail, err)
}
if _, err := orchReady.TestHookHealImmutableFluxJobs(context.Background()); err == nil || !strings.Contains(err.Error(), "query jobs") {
t.Fatalf("expected immutable-job query error branch, got %v", err)
}
})
t.Run("wait-for-flux-health-immutable-heal-path-with-cancel", func(t *testing.T) {
cfg := lifecycleConfig(t)
cfg.Startup.FluxHealthWaitSeconds = 1
cfg.Startup.FluxHealthPollSeconds = 1
run := func(ctx context.Context, timeout time.Duration, name string, args ...string) (string, error) {
command := name + " " + strings.Join(args, " ")
switch {
case name == "kubectl" && strings.Contains(command, "get kustomizations.kustomize.toolkit.fluxcd.io -A -o json"):
return `{"items":[{"metadata":{"namespace":"flux-system","name":"apps"},"spec":{"suspend":false},"status":{"conditions":[{"type":"Ready","status":"False","message":"Job failed: field is immutable","reason":"ReconciliationFailed"}]}}]}`, nil
case name == "kubectl" && strings.Contains(command, "get jobs -A -o json"):
return "", errors.New("jobs unavailable")
default:
return lifecycleDispatcher(&commandRecorder{})(ctx, timeout, name, args...)
}
}
orch, _ := newHookOrchestratorAdvanced(t, cfg, false, run, run)
ctx, cancel := context.WithCancel(context.Background())
cancel()
if err := orch.TestHookWaitForFluxHealth(ctx); !errors.Is(err, context.Canceled) {
t.Fatalf("expected canceled flux wait branch, got %v", err)
}
})
t.Run("required-node-labels-skip-empty-and-apply-one", func(t *testing.T) {
cfg := lifecycleConfig(t)
cfg.Startup.RequiredNodeLabels = map[string]map[string]string{
"node-empty-map": {},
"node-empty-val": {"zone": " "},
"node-apply": {"zone": "lab-a"},
}
rec := &commandRecorder{}
base := lifecycleDispatcher(rec)
orch, _ := newHookOrchestratorAdvanced(t, cfg, false, base, base)
if err := orch.TestHookEnsureRequiredNodeLabels(context.Background()); err != nil {
t.Fatalf("expected ensureRequiredNodeLabels skip/apply branches, got %v", err)
}
if !rec.contains("label node node-apply --overwrite zone=lab-a") {
t.Fatalf("expected non-empty label application branch")
}
})
t.Run("wait-for-startup-convergence-dryrun-and-critical-endpoint-fail", func(t *testing.T) {
cfgDry := lifecycleConfig(t)
orchDry, _ := newHookOrchestratorAdvanced(t, cfgDry, true, nil, nil)
if err := orchDry.TestHookWaitForStartupConvergence(context.Background()); err != nil {
t.Fatalf("expected startup convergence dry-run fast path, got %v", err)
}
cfgFail := lifecycleConfig(t)
cfgFail.Startup.RequireIngressChecklist = false
cfgFail.Startup.RequireServiceChecklist = false
cfgFail.Startup.RequireFluxHealth = false
cfgFail.Startup.RequireWorkloadConvergence = false
cfgFail.Startup.RequireCriticalServiceEndpoints = true
run := func(ctx context.Context, timeout time.Duration, name string, args ...string) (string, error) {
command := name + " " + strings.Join(args, " ")
if name == "kubectl" && strings.Contains(command, "get endpoints") {
return "", errors.New("endpoint query failed")
}
return lifecycleDispatcher(&commandRecorder{})(ctx, timeout, name, args...)
}
orchFail, _ := newHookOrchestratorAdvanced(t, cfgFail, false, run, run)
if err := orchFail.TestHookWaitForStartupConvergence(context.Background()); err == nil || !strings.Contains(err.Error(), "query endpoints") {
t.Fatalf("expected critical-endpoint convergence failure branch, got %v", err)
}
})
t.Run("ingress-namespace-discovery-empty-and-query-error", func(t *testing.T) {
cfg := lifecycleConfig(t)
orchEmpty, _ := newHookOrchestratorAdvanced(t, cfg, false, nil, nil)
namespaces, err := orchEmpty.TestHookDiscoverIngressNamespacesForHost(context.Background(), " ")
if err != nil || len(namespaces) != 0 {
t.Fatalf("expected empty-host fast path, namespaces=%v err=%v", namespaces, err)
}
runErr := func(ctx context.Context, timeout time.Duration, name string, args ...string) (string, error) {
command := name + " " + strings.Join(args, " ")
if name == "kubectl" && strings.Contains(command, "get ingress -A -o json") {
return "", errors.New("ingress query failed")
}
return lifecycleDispatcher(&commandRecorder{})(ctx, timeout, name, args...)
}
orchErr, _ := newHookOrchestratorAdvanced(t, cfg, false, runErr, runErr)
if _, err := orchErr.TestHookDiscoverIngressNamespacesForHost(context.Background(), "metrics.bstein.dev"); err == nil || !strings.Contains(err.Error(), "query ingresses") {
t.Fatalf("expected ingress query error branch, got %v", err)
}
})
t.Run("service-checklist-name-fallback-and-contains-empty-marker", func(t *testing.T) {
listener, err := net.Listen("tcp", "127.0.0.1:0")
if err != nil {
t.Fatalf("open local listener: %v", err)
}
defer listener.Close()
go func() {
conn, acceptErr := listener.Accept()
if acceptErr == nil {
_, _ = conn.Write([]byte("HTTP/1.1 503 Service Unavailable\r\nContent-Length: 3\r\n\r\nbad"))
_ = conn.Close()
}
}()
cfg := lifecycleConfig(t)
cfg.Startup.ServiceChecklist = []config.ServiceChecklistCheck{{
Name: "",
URL: "http://" + listener.Addr().String() + "/health",
AcceptedStatuses: []int{200},
}}
orch, _ := newHookOrchestratorAdvanced(t, cfg, false, nil, nil)
ready, detail := orch.TestHookServiceChecklistReady(context.Background())
if ready || !strings.Contains(detail, "http://") {
t.Fatalf("expected service checklist URL-name fallback failure, ready=%v detail=%q", ready, detail)
}
if !cluster.TestHookChecklistContains("any body", " ") {
t.Fatalf("expected empty-marker compact contains branch")
}
})
t.Run("node-inventory-reachability-defaults-and-cancel", func(t *testing.T) {
cfg := lifecycleConfig(t)
cfg.Startup.RequireNodeInventoryReach = true
cfg.Startup.NodeInventoryReachWaitSeconds = 0
cfg.Startup.NodeInventoryReachPollSeconds = 0
cfg.Startup.IgnoreUnavailableNodes = []string{"titan-23"}
run := func(ctx context.Context, timeout time.Duration, name string, args ...string) (string, error) {
command := name + " " + strings.Join(args, " ")
if name == "ssh" && strings.Contains(command, "__ANANKE_NODE_REACHABLE__") {
return "unexpected", nil
}
return lifecycleDispatcher(&commandRecorder{})(ctx, timeout, name, args...)
}
orch, _ := newHookOrchestratorAdvanced(t, cfg, false, run, run)
ctx, cancel := context.WithCancel(context.Background())
cancel()
if err := orch.TestHookWaitForNodeInventoryReachability(ctx); !errors.Is(err, context.Canceled) {
t.Fatalf("expected canceled inventory reachability branch, got %v", err)
}
})
})
t.Run("poststart-timesync-workload-branches", func(t *testing.T) {
t.Run("poststart-default-poll-and-cancel", func(t *testing.T) {
cfg := lifecycleConfig(t)
cfg.Startup.PostStartProbes = []string{"https://metrics.bstein.dev/api/health"}
cfg.Startup.PostStartProbeWaitSeconds = 0
cfg.Startup.PostStartProbePollSeconds = 0
run := func(ctx context.Context, timeout time.Duration, name string, args ...string) (string, error) {
if name == "curl" {
return "", errors.New("curl failed")
}
return lifecycleDispatcher(&commandRecorder{})(ctx, timeout, name, args...)
}
orch, _ := newHookOrchestratorAdvanced(t, cfg, false, run, run)
ctx, cancel := context.WithCancel(context.Background())
cancel()
if err := orch.TestHookWaitForPostStartProbes(ctx); !errors.Is(err, context.Canceled) {
t.Fatalf("expected canceled post-start branch, got %v", err)
}
})
t.Run("resume-flux-with-commandexists-and-flux-fail-warning-path", func(t *testing.T) {
cfg := lifecycleConfig(t)
fakeBin := t.TempDir()
fluxPath := filepath.Join(fakeBin, "flux")
if err := os.WriteFile(fluxPath, []byte("#!/bin/sh\nexit 0\n"), 0o755); err != nil {
t.Fatalf("write fake flux binary: %v", err)
}
t.Setenv("PATH", fakeBin+string(os.PathListSeparator)+os.Getenv("PATH"))
run := func(ctx context.Context, timeout time.Duration, name string, args ...string) (string, error) {
if name == "flux" {
return "", errors.New("flux reconcile failed")
}
return lifecycleDispatcher(&commandRecorder{})(ctx, timeout, name, args...)
}
orch, _ := newHookOrchestratorAdvanced(t, cfg, false, run, run)
if err := orch.TestHookResumeFluxAndReconcile(context.Background()); err != nil {
t.Fatalf("expected resume flux warning-only branch, got %v", err)
}
})
t.Run("timesync-defaults-quorum-and-datastore-cancel", func(t *testing.T) {
cfg := lifecycleConfig(t)
cfg.Startup.TimeSyncMode = "quorum"
cfg.Startup.TimeSyncWaitSeconds = 0
cfg.Startup.TimeSyncPollSeconds = 0
cfg.Startup.TimeSyncQuorum = 0
cfg.SSHManagedNodes = []string{"titan-db"}
cfg.SSHNodeHosts["db-host"] = "127.0.0.1"
run := func(ctx context.Context, timeout time.Duration, name string, args ...string) (string, error) {
command := name + " " + strings.Join(args, " ")
switch {
case name == "sh" && strings.Contains(command, "timedatectl show -p NTPSynchronized"):
return "no", nil
case name == "ssh" && strings.Contains(command, "timedatectl show -p NTPSynchronized"):
return "", errors.New("ssh timed out")
case name == "ssh" && strings.Contains(command, "systemctl cat k3s"):
return "ExecStart=/usr/local/bin/k3s server --datastore-endpoint=postgres://127.0.0.1/k3s", nil
default:
return lifecycleDispatcher(&commandRecorder{})(ctx, timeout, name, args...)
}
}
orch, _ := newHookOrchestratorAdvanced(t, cfg, false, run, run)
ctx, cancel := context.WithCancel(context.Background())
cancel()
if err := orch.TestHookWaitForTimeSync(ctx, []string{"", "titan-db"}); !errors.Is(err, context.Canceled) {
t.Fatalf("expected canceled time-sync branch, got %v", err)
}
if err := orch.TestHookPreflightExternalDatastore(ctx); !errors.Is(err, context.Canceled) {
t.Fatalf("expected canceled datastore preflight branch, got %v", err)
}
})
t.Run("workload-convergence-and-ignore-quick-branches", func(t *testing.T) {
cfg := lifecycleConfig(t)
cfg.Startup.WorkloadConvergenceWaitSeconds = 0
cfg.Startup.WorkloadConvergencePollSeconds = 0
cfg.Startup.IgnoreUnavailableNodes = []string{"titan-22"}
run := func(ctx context.Context, timeout time.Duration, name string, args ...string) (string, error) {
command := name + " " + strings.Join(args, " ")
switch {
case name == "kubectl" && strings.Contains(command, "get deploy,statefulset,daemonset -A -o json"):
return `{"items":[{"kind":"","metadata":{"namespace":"monitoring","name":"skip"}},{"kind":"Deployment","metadata":{"namespace":"monitoring","name":"replicas-zero"},"spec":{"replicas":0,"template":{"spec":{}}},"status":{"readyReplicas":0}},{"kind":"DaemonSet","metadata":{"namespace":"apps","name":"on-ignored"},"spec":{"template":{"spec":{"nodeSelector":{"kubernetes.io/hostname":"titan-22"}}}},"status":{"desiredNumberScheduled":1,"numberReady":0}}]}`, nil
case name == "kubectl" && strings.Contains(command, "get pods -A -o json"):
return `{"items":[{"metadata":{"namespace":"","name":"skip"}},{"metadata":{"namespace":"apps","name":"stuck","creationTimestamp":"2000-01-01T00:00:00Z","ownerReferences":[{"kind":"ReplicaSet"}]},"status":{"phase":"Pending","containerStatuses":[{"state":{"waiting":{"reason":"CrashLoopBackOff"}}}]}}]}`, nil
default:
return lifecycleDispatcher(&commandRecorder{})(ctx, timeout, name, args...)
}
}
orch, _ := newHookOrchestratorAdvanced(t, cfg, false, run, run)
if err := orch.TestHookWaitForWorkloadConvergence(context.Background()); err != nil {
t.Fatalf("expected workload convergence default-branch success, got %v", err)
}
cfgIgnore := lifecycleConfig(t)
cfgIgnore.Startup.AutoRecycleStuckPods = false
orchIgnoreDry, _ := newHookOrchestratorAdvanced(t, cfgIgnore, true, run, run)
now := time.Now().UTC().Add(-time.Hour)
orchIgnoreDry.TestHookMaybeAutoRecycleStuckPods(context.Background(), &now)
orchIgnoreDry.TestHookMaybeAutoHealCriticalWorkloadReplicas(context.Background(), &now)
runHealErr := func(ctx context.Context, timeout time.Duration, name string, args ...string) (string, error) {
command := name + " " + strings.Join(args, " ")
if name == "kubectl" && strings.Contains(command, "get deploy,statefulset -A -o json") {
return "", errors.New("workload query failed")
}
return lifecycleDispatcher(&commandRecorder{})(ctx, timeout, name, args...)
}
orchHealErr, _ := newHookOrchestratorAdvanced(t, lifecycleConfig(t), false, runHealErr, runHealErr)
if _, err := orchHealErr.TestHookHealCriticalWorkloadReplicas(context.Background()); err == nil || !strings.Contains(err.Error(), "query workloads") {
t.Fatalf("expected critical workload heal query-error branch, got %v", err)
}
})
})
}