561 lines
27 KiB
Go
561 lines
27 KiB
Go
package orchestrator
|
|
|
|
import (
|
|
"context"
|
|
"encoding/base64"
|
|
"errors"
|
|
"io"
|
|
"log"
|
|
"net"
|
|
"os"
|
|
"path/filepath"
|
|
"strings"
|
|
"testing"
|
|
"time"
|
|
|
|
"scm.bstein.dev/bstein/ananke/internal/cluster"
|
|
"scm.bstein.dev/bstein/ananke/internal/config"
|
|
"scm.bstein.dev/bstein/ananke/internal/execx"
|
|
"scm.bstein.dev/bstein/ananke/internal/state"
|
|
)
|
|
|
|
// newHookOrchestratorAdvanced runs one orchestration or CLI step.
|
|
// Signature: newHookOrchestratorAdvanced(t *testing.T, cfg config.Config, dryRun bool, run commandOverride, runSensitive commandOverride) (*cluster.Orchestrator, *commandRecorder).
|
|
// Why: this part10 matrix needs dry-run and non-dry-run variants while keeping
|
|
// command dispatch deterministic from the top-level testing module.
|
|
func newHookOrchestratorAdvanced(
|
|
t *testing.T,
|
|
cfg config.Config,
|
|
dryRun bool,
|
|
run func(context.Context, time.Duration, string, ...string) (string, error),
|
|
runSensitive func(context.Context, time.Duration, string, ...string) (string, error),
|
|
) (*cluster.Orchestrator, *commandRecorder) {
|
|
t.Helper()
|
|
if err := os.MkdirAll(cfg.State.Dir, 0o755); err != nil {
|
|
t.Fatalf("ensure state dir: %v", err)
|
|
}
|
|
recorder := &commandRecorder{}
|
|
if run == nil || runSensitive == nil {
|
|
base := lifecycleDispatcher(recorder)
|
|
if run == nil {
|
|
run = base
|
|
}
|
|
if runSensitive == nil {
|
|
runSensitive = base
|
|
}
|
|
}
|
|
orch := cluster.New(cfg, &execx.Runner{DryRun: dryRun}, state.New(cfg.State.RunHistoryPath), log.New(io.Discard, "", 0))
|
|
orch.SetCommandOverrides(run, runSensitive)
|
|
return orch, recorder
|
|
}
|
|
|
|
// TestHookGapMatrixPart10LowFileClosure runs one orchestration or CLI step.
|
|
// Signature: TestHookGapMatrixPart10LowFileClosure(t *testing.T).
|
|
// Why: closes remaining branch gaps on low-coverage orchestrator files using
|
|
// targeted hook-level scenarios instead of brittle full-drill reruns.
|
|
func TestHookGapMatrixPart10LowFileClosure(t *testing.T) {
|
|
t.Run("critical-vault-low-branches", func(t *testing.T) {
|
|
t.Run("vault-sealed-parse-error", func(t *testing.T) {
|
|
cfg := lifecycleConfig(t)
|
|
run := func(ctx context.Context, timeout time.Duration, name string, args ...string) (string, error) {
|
|
command := name + " " + strings.Join(args, " ")
|
|
if name == "kubectl" && strings.Contains(command, "vault status -format=json") {
|
|
return "{invalid-json", nil
|
|
}
|
|
return lifecycleDispatcher(&commandRecorder{})(ctx, timeout, name, args...)
|
|
}
|
|
orch, _ := newHookOrchestratorAdvanced(t, cfg, false, run, run)
|
|
if _, err := orch.TestHookVaultSealed(context.Background()); err == nil || !strings.Contains(err.Error(), "parse vault status") {
|
|
t.Fatalf("expected vault status parse error branch, got %v", err)
|
|
}
|
|
})
|
|
|
|
t.Run("vault-unseal-key-empty-decoded", func(t *testing.T) {
|
|
cfg := lifecycleConfig(t)
|
|
cfg.Startup.VaultUnsealBreakglassCommand = ""
|
|
encodedEmpty := base64.StdEncoding.EncodeToString([]byte(" "))
|
|
run := func(ctx context.Context, timeout time.Duration, name string, args ...string) (string, error) {
|
|
command := name + " " + strings.Join(args, " ")
|
|
if name == "kubectl" && strings.Contains(command, "get secret vault-init") {
|
|
return encodedEmpty, nil
|
|
}
|
|
return lifecycleDispatcher(&commandRecorder{})(ctx, timeout, name, args...)
|
|
}
|
|
orch, _ := newHookOrchestratorAdvanced(t, cfg, false, run, run)
|
|
if _, err := orch.TestHookVaultUnsealKey(context.Background()); err == nil || !strings.Contains(err.Error(), "vault-init unseal key is empty") {
|
|
t.Fatalf("expected empty decoded unseal key branch, got %v", err)
|
|
}
|
|
})
|
|
|
|
t.Run("write-unseal-key-file-write-error", func(t *testing.T) {
|
|
cfg := lifecycleConfig(t)
|
|
cfg.Startup.VaultUnsealKeyFile = t.TempDir()
|
|
orch, _ := newHookOrchestratorAdvanced(t, cfg, false, nil, nil)
|
|
if err := orch.TestHookWriteVaultUnsealKeyFile("vault-key"); err == nil || !strings.Contains(err.Error(), "write vault unseal key file") {
|
|
t.Fatalf("expected write failure branch when key path is a directory, got %v", err)
|
|
}
|
|
})
|
|
|
|
t.Run("workload-ready-no-value-and-ensure-error", func(t *testing.T) {
|
|
cfg := lifecycleConfig(t)
|
|
runNoValue := func(ctx context.Context, timeout time.Duration, name string, args ...string) (string, error) {
|
|
command := name + " " + strings.Join(args, " ")
|
|
if name == "kubectl" && strings.Contains(command, "jsonpath={.status.readyReplicas}") {
|
|
return "<no value>", nil
|
|
}
|
|
return lifecycleDispatcher(&commandRecorder{})(ctx, timeout, name, args...)
|
|
}
|
|
orchNoValue, _ := newHookOrchestratorAdvanced(t, cfg, false, runNoValue, runNoValue)
|
|
ready, err := orchNoValue.TestHookWorkloadReady(context.Background(), "vault", "statefulset", "vault")
|
|
if err != nil || ready {
|
|
t.Fatalf("expected no-value readiness branch, ready=%v err=%v", ready, err)
|
|
}
|
|
|
|
runEnsureErr := func(ctx context.Context, timeout time.Duration, name string, args ...string) (string, error) {
|
|
command := name + " " + strings.Join(args, " ")
|
|
switch {
|
|
case name == "kubectl" && strings.Contains(command, " scale "):
|
|
return "", nil
|
|
case name == "kubectl" && strings.Contains(command, "get pods -o custom-columns"):
|
|
return "", nil
|
|
case name == "kubectl" && strings.Contains(command, "rollout status statefulset/victoria-metrics-single-server"):
|
|
return "", errors.New("rollout failed")
|
|
default:
|
|
return lifecycleDispatcher(&commandRecorder{})(ctx, timeout, name, args...)
|
|
}
|
|
}
|
|
orchEnsureErr, _ := newHookOrchestratorAdvanced(t, cfg, false, runEnsureErr, runEnsureErr)
|
|
if err := orchEnsureErr.TestHookEnsureCriticalStartupWorkloads(context.Background()); err == nil || !strings.Contains(err.Error(), "rollout failed") {
|
|
t.Fatalf("expected ensureCriticalStartupWorkloads wait error branch, got %v", err)
|
|
}
|
|
})
|
|
|
|
t.Run("ensure-vault-unsealed-phase-and-followup-errors", func(t *testing.T) {
|
|
cfgPhase := lifecycleConfig(t)
|
|
runPhase := func(ctx context.Context, timeout time.Duration, name string, args ...string) (string, error) {
|
|
command := name + " " + strings.Join(args, " ")
|
|
if name == "kubectl" && strings.Contains(command, "get pod vault-0") {
|
|
return "Pending", nil
|
|
}
|
|
return lifecycleDispatcher(&commandRecorder{})(ctx, timeout, name, args...)
|
|
}
|
|
orchPhase, _ := newHookOrchestratorAdvanced(t, cfgPhase, false, runPhase, runPhase)
|
|
if err := orchPhase.TestHookEnsureVaultUnsealed(context.Background()); err == nil || !strings.Contains(err.Error(), "pod phase") {
|
|
t.Fatalf("expected pod phase guard branch, got %v", err)
|
|
}
|
|
|
|
cfgFollowup := lifecycleConfig(t)
|
|
sealedCalls := 0
|
|
runFollowup := func(ctx context.Context, timeout time.Duration, name string, args ...string) (string, error) {
|
|
command := name + " " + strings.Join(args, " ")
|
|
switch {
|
|
case name == "kubectl" && strings.Contains(command, "get pod vault-0"):
|
|
return "Running", nil
|
|
case name == "kubectl" && strings.Contains(command, "get secret vault-init"):
|
|
return base64.StdEncoding.EncodeToString([]byte("vault-key")), nil
|
|
case name == "kubectl" && strings.Contains(command, "vault status -format=json"):
|
|
sealedCalls++
|
|
if sealedCalls == 1 {
|
|
return `{"sealed":true}`, nil
|
|
}
|
|
return "", errors.New("vault status failed")
|
|
default:
|
|
return lifecycleDispatcher(&commandRecorder{})(ctx, timeout, name, args...)
|
|
}
|
|
}
|
|
runSensitive := func(ctx context.Context, timeout time.Duration, name string, args ...string) (string, error) {
|
|
command := name + " " + strings.Join(args, " ")
|
|
if name == "kubectl" && strings.Contains(command, "vault operator unseal") {
|
|
return "ok", nil
|
|
}
|
|
return runFollowup(ctx, timeout, name, args...)
|
|
}
|
|
orchFollowup, _ := newHookOrchestratorAdvanced(t, cfgFollowup, false, runFollowup, runSensitive)
|
|
if err := orchFollowup.TestHookEnsureVaultUnsealed(context.Background()); err == nil || !strings.Contains(err.Error(), "vault status check failed") {
|
|
t.Fatalf("expected follow-up sealed status error branch, got %v", err)
|
|
}
|
|
})
|
|
})
|
|
|
|
t.Run("drain-and-scaling-low-branches", func(t *testing.T) {
|
|
t.Run("drain-workers-error-aggregation-and-diagnostics", func(t *testing.T) {
|
|
cfg := lifecycleConfig(t)
|
|
cfg.Shutdown.DrainParallelism = 0
|
|
workers := []string{"w1", "w2", "w3", "w4", "w5"}
|
|
run := func(ctx context.Context, timeout time.Duration, name string, args ...string) (string, error) {
|
|
command := name + " " + strings.Join(args, " ")
|
|
switch {
|
|
case name == "kubectl" && strings.Contains(command, "cordon "):
|
|
return "", errors.New("cordon denied")
|
|
case name == "kubectl" && strings.Contains(command, "drain "):
|
|
return "", errors.New("drain denied")
|
|
case name == "kubectl" && strings.Contains(command, "get pods -A --field-selector"):
|
|
return strings.Join([]string{
|
|
"malformed",
|
|
"ns p1 Running Deployment",
|
|
"ns p2 Running Deployment",
|
|
"ns p3 Running Deployment",
|
|
"ns p4 Running Deployment",
|
|
"ns p5 Running Deployment",
|
|
"ns p6 Running Deployment",
|
|
"ns p7 Running Deployment",
|
|
}, "\n"), nil
|
|
default:
|
|
return lifecycleDispatcher(&commandRecorder{})(ctx, timeout, name, args...)
|
|
}
|
|
}
|
|
orch, _ := newHookOrchestratorAdvanced(t, cfg, false, run, run)
|
|
err := orch.TestHookDrainWorkers(context.Background(), workers)
|
|
if err == nil || !strings.Contains(err.Error(), "drain workers had 5 errors") {
|
|
t.Fatalf("expected drain aggregation branch, got %v", err)
|
|
}
|
|
})
|
|
|
|
t.Run("run-ssh-across-nodes-clamp-and-skip", func(t *testing.T) {
|
|
cfg := lifecycleConfig(t)
|
|
cfg.Shutdown.SSHParallelism = 99
|
|
cfg.SSHManagedNodes = []string{"titan-db"}
|
|
rec := &commandRecorder{}
|
|
base := lifecycleDispatcher(rec)
|
|
orch, _ := newHookOrchestratorAdvanced(t, cfg, false, base, base)
|
|
orch.TestHookRunSSHAcrossNodes(context.Background(), []string{"titan-db", "not-managed"}, "noop", "echo ok")
|
|
if !rec.contains("atlas@titan-db echo ok") {
|
|
t.Fatalf("expected managed ssh execution branch")
|
|
}
|
|
})
|
|
|
|
t.Run("latest-etcd-snapshot-empty-list", func(t *testing.T) {
|
|
cfg := lifecycleConfig(t)
|
|
run := func(ctx context.Context, timeout time.Duration, name string, args ...string) (string, error) {
|
|
command := name + " " + strings.Join(args, " ")
|
|
if name == "ssh" && strings.Contains(command, "etcd-snapshot ls") {
|
|
return "NAME LOCATION", nil
|
|
}
|
|
return lifecycleDispatcher(&commandRecorder{})(ctx, timeout, name, args...)
|
|
}
|
|
orch, _ := newHookOrchestratorAdvanced(t, cfg, false, run, run)
|
|
if _, err := orch.TestHookLatestEtcdSnapshotPath(context.Background(), "titan-db"); err == nil || !strings.Contains(err.Error(), "no etcd snapshots found") {
|
|
t.Fatalf("expected empty snapshot-list branch, got %v", err)
|
|
}
|
|
})
|
|
|
|
t.Run("scaling-fallback-and-snapshot-writer-errors", func(t *testing.T) {
|
|
cfg := lifecycleConfig(t)
|
|
cfg.Workers = nil
|
|
cfg.SSHManagedNodes = []string{"titan-db", "titan-23", " "}
|
|
cfg.ControlPlanes = []string{"titan-db"}
|
|
runWorkers := func(ctx context.Context, timeout time.Duration, name string, args ...string) (string, error) {
|
|
if name == "kubectl" && len(args) >= 2 && args[0] == "get" && args[1] == "nodes" {
|
|
return "", errors.New("api down")
|
|
}
|
|
return lifecycleDispatcher(&commandRecorder{})(ctx, timeout, name, args...)
|
|
}
|
|
orchWorkers, _ := newHookOrchestratorAdvanced(t, cfg, false, runWorkers, runWorkers)
|
|
workers, err := orchWorkers.TestHookEffectiveWorkers(context.Background())
|
|
if err != nil || len(workers) == 0 {
|
|
t.Fatalf("expected inventory worker fallback branch, workers=%v err=%v", workers, err)
|
|
}
|
|
|
|
cfgWrite := lifecycleConfig(t)
|
|
cfgWrite.State.Dir = t.TempDir()
|
|
// Force write failure by making the snapshot output path a directory.
|
|
if err := os.MkdirAll(filepath.Join(cfgWrite.State.Dir, "scaled-workloads.json"), 0o755); err != nil {
|
|
t.Fatalf("create snapshot blocker directory: %v", err)
|
|
}
|
|
runWrite := func(ctx context.Context, timeout time.Duration, name string, args ...string) (string, error) {
|
|
command := name + " " + strings.Join(args, " ")
|
|
switch {
|
|
case name == "kubectl" && strings.Contains(command, "get deployment -A -o jsonpath="):
|
|
return "monitoring\tgrafana\t1\n", nil
|
|
case name == "kubectl" && strings.Contains(command, "get statefulset -A -o jsonpath="):
|
|
return "", nil
|
|
default:
|
|
return lifecycleDispatcher(&commandRecorder{})(ctx, timeout, name, args...)
|
|
}
|
|
}
|
|
orchWrite, _ := newHookOrchestratorAdvanced(t, cfgWrite, false, runWrite, runWrite)
|
|
if err := orchWrite.TestHookScaleDownApps(context.Background()); err == nil || !strings.Contains(err.Error(), "write scaled workload snapshot") {
|
|
t.Fatalf("expected scaled snapshot write-failure branch, got %v", err)
|
|
}
|
|
})
|
|
})
|
|
|
|
t.Run("flux-ingress-service-and-inventory-low-branches", func(t *testing.T) {
|
|
t.Run("flux-health-ready-reason-fallback-and-heal-query-error", func(t *testing.T) {
|
|
cfg := lifecycleConfig(t)
|
|
runReady := func(ctx context.Context, timeout time.Duration, name string, args ...string) (string, error) {
|
|
command := name + " " + strings.Join(args, " ")
|
|
switch {
|
|
case name == "kubectl" && strings.Contains(command, "get kustomizations.kustomize.toolkit.fluxcd.io -A -o json"):
|
|
return `{"items":[{"metadata":{"namespace":"","name":"skip"},"spec":{"suspend":false},"status":{"conditions":[]}},{"metadata":{"namespace":"flux-system","name":"apps"},"spec":{"suspend":false},"status":{"conditions":[{"type":"Ready","status":"False","reason":"","message":""}]}}]}`, nil
|
|
case name == "kubectl" && strings.Contains(command, "get jobs -A -o json"):
|
|
return "", errors.New("jobs query failed")
|
|
default:
|
|
return lifecycleDispatcher(&commandRecorder{})(ctx, timeout, name, args...)
|
|
}
|
|
}
|
|
orchReady, _ := newHookOrchestratorAdvanced(t, cfg, false, runReady, runReady)
|
|
ready, detail, err := orchReady.TestHookFluxHealthReady(context.Background())
|
|
if err != nil || ready || !strings.Contains(detail, "ready=false") {
|
|
t.Fatalf("expected flux ready-reason fallback branch, ready=%v detail=%q err=%v", ready, detail, err)
|
|
}
|
|
if _, err := orchReady.TestHookHealImmutableFluxJobs(context.Background()); err == nil || !strings.Contains(err.Error(), "query jobs") {
|
|
t.Fatalf("expected immutable-job query error branch, got %v", err)
|
|
}
|
|
})
|
|
|
|
t.Run("wait-for-flux-health-immutable-heal-path-with-cancel", func(t *testing.T) {
|
|
cfg := lifecycleConfig(t)
|
|
cfg.Startup.FluxHealthWaitSeconds = 1
|
|
cfg.Startup.FluxHealthPollSeconds = 1
|
|
run := func(ctx context.Context, timeout time.Duration, name string, args ...string) (string, error) {
|
|
command := name + " " + strings.Join(args, " ")
|
|
switch {
|
|
case name == "kubectl" && strings.Contains(command, "get kustomizations.kustomize.toolkit.fluxcd.io -A -o json"):
|
|
return `{"items":[{"metadata":{"namespace":"flux-system","name":"apps"},"spec":{"suspend":false},"status":{"conditions":[{"type":"Ready","status":"False","message":"Job failed: field is immutable","reason":"ReconciliationFailed"}]}}]}`, nil
|
|
case name == "kubectl" && strings.Contains(command, "get jobs -A -o json"):
|
|
return "", errors.New("jobs unavailable")
|
|
default:
|
|
return lifecycleDispatcher(&commandRecorder{})(ctx, timeout, name, args...)
|
|
}
|
|
}
|
|
orch, _ := newHookOrchestratorAdvanced(t, cfg, false, run, run)
|
|
ctx, cancel := context.WithCancel(context.Background())
|
|
cancel()
|
|
if err := orch.TestHookWaitForFluxHealth(ctx); !errors.Is(err, context.Canceled) {
|
|
t.Fatalf("expected canceled flux wait branch, got %v", err)
|
|
}
|
|
})
|
|
|
|
t.Run("required-node-labels-skip-empty-and-apply-one", func(t *testing.T) {
|
|
cfg := lifecycleConfig(t)
|
|
cfg.Startup.RequiredNodeLabels = map[string]map[string]string{
|
|
"node-empty-map": {},
|
|
"node-empty-val": {"zone": " "},
|
|
"node-apply": {"zone": "lab-a"},
|
|
}
|
|
rec := &commandRecorder{}
|
|
base := lifecycleDispatcher(rec)
|
|
orch, _ := newHookOrchestratorAdvanced(t, cfg, false, base, base)
|
|
if err := orch.TestHookEnsureRequiredNodeLabels(context.Background()); err != nil {
|
|
t.Fatalf("expected ensureRequiredNodeLabels skip/apply branches, got %v", err)
|
|
}
|
|
if !rec.contains("label node node-apply --overwrite zone=lab-a") {
|
|
t.Fatalf("expected non-empty label application branch")
|
|
}
|
|
})
|
|
|
|
t.Run("wait-for-startup-convergence-dryrun-and-critical-endpoint-fail", func(t *testing.T) {
|
|
cfgDry := lifecycleConfig(t)
|
|
orchDry, _ := newHookOrchestratorAdvanced(t, cfgDry, true, nil, nil)
|
|
if err := orchDry.TestHookWaitForStartupConvergence(context.Background()); err != nil {
|
|
t.Fatalf("expected startup convergence dry-run fast path, got %v", err)
|
|
}
|
|
|
|
cfgFail := lifecycleConfig(t)
|
|
cfgFail.Startup.RequireIngressChecklist = false
|
|
cfgFail.Startup.RequireServiceChecklist = false
|
|
cfgFail.Startup.RequireFluxHealth = false
|
|
cfgFail.Startup.RequireWorkloadConvergence = false
|
|
cfgFail.Startup.RequireCriticalServiceEndpoints = true
|
|
run := func(ctx context.Context, timeout time.Duration, name string, args ...string) (string, error) {
|
|
command := name + " " + strings.Join(args, " ")
|
|
if name == "kubectl" && strings.Contains(command, "get endpoints") {
|
|
return "", errors.New("endpoint query failed")
|
|
}
|
|
return lifecycleDispatcher(&commandRecorder{})(ctx, timeout, name, args...)
|
|
}
|
|
orchFail, _ := newHookOrchestratorAdvanced(t, cfgFail, false, run, run)
|
|
if err := orchFail.TestHookWaitForStartupConvergence(context.Background()); err == nil || !strings.Contains(err.Error(), "query endpoints") {
|
|
t.Fatalf("expected critical-endpoint convergence failure branch, got %v", err)
|
|
}
|
|
})
|
|
|
|
t.Run("ingress-namespace-discovery-empty-and-query-error", func(t *testing.T) {
|
|
cfg := lifecycleConfig(t)
|
|
orchEmpty, _ := newHookOrchestratorAdvanced(t, cfg, false, nil, nil)
|
|
namespaces, err := orchEmpty.TestHookDiscoverIngressNamespacesForHost(context.Background(), " ")
|
|
if err != nil || len(namespaces) != 0 {
|
|
t.Fatalf("expected empty-host fast path, namespaces=%v err=%v", namespaces, err)
|
|
}
|
|
|
|
runErr := func(ctx context.Context, timeout time.Duration, name string, args ...string) (string, error) {
|
|
command := name + " " + strings.Join(args, " ")
|
|
if name == "kubectl" && strings.Contains(command, "get ingress -A -o json") {
|
|
return "", errors.New("ingress query failed")
|
|
}
|
|
return lifecycleDispatcher(&commandRecorder{})(ctx, timeout, name, args...)
|
|
}
|
|
orchErr, _ := newHookOrchestratorAdvanced(t, cfg, false, runErr, runErr)
|
|
if _, err := orchErr.TestHookDiscoverIngressNamespacesForHost(context.Background(), "metrics.bstein.dev"); err == nil || !strings.Contains(err.Error(), "query ingresses") {
|
|
t.Fatalf("expected ingress query error branch, got %v", err)
|
|
}
|
|
})
|
|
|
|
t.Run("service-checklist-name-fallback-and-contains-empty-marker", func(t *testing.T) {
|
|
listener, err := net.Listen("tcp", "127.0.0.1:0")
|
|
if err != nil {
|
|
t.Fatalf("open local listener: %v", err)
|
|
}
|
|
defer listener.Close()
|
|
go func() {
|
|
conn, acceptErr := listener.Accept()
|
|
if acceptErr == nil {
|
|
_, _ = conn.Write([]byte("HTTP/1.1 503 Service Unavailable\r\nContent-Length: 3\r\n\r\nbad"))
|
|
_ = conn.Close()
|
|
}
|
|
}()
|
|
|
|
cfg := lifecycleConfig(t)
|
|
cfg.Startup.ServiceChecklist = []config.ServiceChecklistCheck{{
|
|
Name: "",
|
|
URL: "http://" + listener.Addr().String() + "/health",
|
|
AcceptedStatuses: []int{200},
|
|
}}
|
|
orch, _ := newHookOrchestratorAdvanced(t, cfg, false, nil, nil)
|
|
ready, detail := orch.TestHookServiceChecklistReady(context.Background())
|
|
if ready || !strings.Contains(detail, "http://") {
|
|
t.Fatalf("expected service checklist URL-name fallback failure, ready=%v detail=%q", ready, detail)
|
|
}
|
|
if !cluster.TestHookChecklistContains("any body", " ") {
|
|
t.Fatalf("expected empty-marker compact contains branch")
|
|
}
|
|
})
|
|
|
|
t.Run("node-inventory-reachability-defaults-and-cancel", func(t *testing.T) {
|
|
cfg := lifecycleConfig(t)
|
|
cfg.Startup.RequireNodeInventoryReach = true
|
|
cfg.Startup.NodeInventoryReachWaitSeconds = 0
|
|
cfg.Startup.NodeInventoryReachPollSeconds = 0
|
|
cfg.Startup.IgnoreUnavailableNodes = []string{"titan-23"}
|
|
run := func(ctx context.Context, timeout time.Duration, name string, args ...string) (string, error) {
|
|
command := name + " " + strings.Join(args, " ")
|
|
if name == "ssh" && strings.Contains(command, "__ANANKE_NODE_REACHABLE__") {
|
|
return "unexpected", nil
|
|
}
|
|
return lifecycleDispatcher(&commandRecorder{})(ctx, timeout, name, args...)
|
|
}
|
|
orch, _ := newHookOrchestratorAdvanced(t, cfg, false, run, run)
|
|
ctx, cancel := context.WithCancel(context.Background())
|
|
cancel()
|
|
if err := orch.TestHookWaitForNodeInventoryReachability(ctx); !errors.Is(err, context.Canceled) {
|
|
t.Fatalf("expected canceled inventory reachability branch, got %v", err)
|
|
}
|
|
})
|
|
})
|
|
|
|
t.Run("poststart-timesync-workload-branches", func(t *testing.T) {
|
|
t.Run("poststart-default-poll-and-cancel", func(t *testing.T) {
|
|
cfg := lifecycleConfig(t)
|
|
cfg.Startup.PostStartProbes = []string{"https://metrics.bstein.dev/api/health"}
|
|
cfg.Startup.PostStartProbeWaitSeconds = 0
|
|
cfg.Startup.PostStartProbePollSeconds = 0
|
|
run := func(ctx context.Context, timeout time.Duration, name string, args ...string) (string, error) {
|
|
if name == "curl" {
|
|
return "", errors.New("curl failed")
|
|
}
|
|
return lifecycleDispatcher(&commandRecorder{})(ctx, timeout, name, args...)
|
|
}
|
|
orch, _ := newHookOrchestratorAdvanced(t, cfg, false, run, run)
|
|
ctx, cancel := context.WithCancel(context.Background())
|
|
cancel()
|
|
if err := orch.TestHookWaitForPostStartProbes(ctx); !errors.Is(err, context.Canceled) {
|
|
t.Fatalf("expected canceled post-start branch, got %v", err)
|
|
}
|
|
})
|
|
|
|
t.Run("resume-flux-with-commandexists-and-flux-fail-warning-path", func(t *testing.T) {
|
|
cfg := lifecycleConfig(t)
|
|
fakeBin := t.TempDir()
|
|
fluxPath := filepath.Join(fakeBin, "flux")
|
|
if err := os.WriteFile(fluxPath, []byte("#!/bin/sh\nexit 0\n"), 0o755); err != nil {
|
|
t.Fatalf("write fake flux binary: %v", err)
|
|
}
|
|
t.Setenv("PATH", fakeBin+string(os.PathListSeparator)+os.Getenv("PATH"))
|
|
run := func(ctx context.Context, timeout time.Duration, name string, args ...string) (string, error) {
|
|
if name == "flux" {
|
|
return "", errors.New("flux reconcile failed")
|
|
}
|
|
return lifecycleDispatcher(&commandRecorder{})(ctx, timeout, name, args...)
|
|
}
|
|
orch, _ := newHookOrchestratorAdvanced(t, cfg, false, run, run)
|
|
if err := orch.TestHookResumeFluxAndReconcile(context.Background()); err != nil {
|
|
t.Fatalf("expected resume flux warning-only branch, got %v", err)
|
|
}
|
|
})
|
|
|
|
t.Run("timesync-defaults-quorum-and-datastore-cancel", func(t *testing.T) {
|
|
cfg := lifecycleConfig(t)
|
|
cfg.Startup.TimeSyncMode = "quorum"
|
|
cfg.Startup.TimeSyncWaitSeconds = 0
|
|
cfg.Startup.TimeSyncPollSeconds = 0
|
|
cfg.Startup.TimeSyncQuorum = 0
|
|
cfg.SSHManagedNodes = []string{"titan-db"}
|
|
cfg.SSHNodeHosts["db-host"] = "127.0.0.1"
|
|
run := func(ctx context.Context, timeout time.Duration, name string, args ...string) (string, error) {
|
|
command := name + " " + strings.Join(args, " ")
|
|
switch {
|
|
case name == "sh" && strings.Contains(command, "timedatectl show -p NTPSynchronized"):
|
|
return "no", nil
|
|
case name == "ssh" && strings.Contains(command, "timedatectl show -p NTPSynchronized"):
|
|
return "", errors.New("ssh timed out")
|
|
case name == "ssh" && strings.Contains(command, "systemctl cat k3s"):
|
|
return "ExecStart=/usr/local/bin/k3s server --datastore-endpoint=postgres://127.0.0.1/k3s", nil
|
|
default:
|
|
return lifecycleDispatcher(&commandRecorder{})(ctx, timeout, name, args...)
|
|
}
|
|
}
|
|
orch, _ := newHookOrchestratorAdvanced(t, cfg, false, run, run)
|
|
ctx, cancel := context.WithCancel(context.Background())
|
|
cancel()
|
|
if err := orch.TestHookWaitForTimeSync(ctx, []string{"", "titan-db"}); !errors.Is(err, context.Canceled) {
|
|
t.Fatalf("expected canceled time-sync branch, got %v", err)
|
|
}
|
|
if err := orch.TestHookPreflightExternalDatastore(ctx); !errors.Is(err, context.Canceled) {
|
|
t.Fatalf("expected canceled datastore preflight branch, got %v", err)
|
|
}
|
|
})
|
|
|
|
t.Run("workload-convergence-and-ignore-quick-branches", func(t *testing.T) {
|
|
cfg := lifecycleConfig(t)
|
|
cfg.Startup.WorkloadConvergenceWaitSeconds = 0
|
|
cfg.Startup.WorkloadConvergencePollSeconds = 0
|
|
cfg.Startup.IgnoreUnavailableNodes = []string{"titan-22"}
|
|
run := func(ctx context.Context, timeout time.Duration, name string, args ...string) (string, error) {
|
|
command := name + " " + strings.Join(args, " ")
|
|
switch {
|
|
case name == "kubectl" && strings.Contains(command, "get deploy,statefulset,daemonset -A -o json"):
|
|
return `{"items":[{"kind":"","metadata":{"namespace":"monitoring","name":"skip"}},{"kind":"Deployment","metadata":{"namespace":"monitoring","name":"replicas-zero"},"spec":{"replicas":0,"template":{"spec":{}}},"status":{"readyReplicas":0}},{"kind":"DaemonSet","metadata":{"namespace":"apps","name":"on-ignored"},"spec":{"template":{"spec":{"nodeSelector":{"kubernetes.io/hostname":"titan-22"}}}},"status":{"desiredNumberScheduled":1,"numberReady":0}}]}`, nil
|
|
case name == "kubectl" && strings.Contains(command, "get pods -A -o json"):
|
|
return `{"items":[{"metadata":{"namespace":"","name":"skip"}},{"metadata":{"namespace":"apps","name":"stuck","creationTimestamp":"2000-01-01T00:00:00Z","ownerReferences":[{"kind":"ReplicaSet"}]},"status":{"phase":"Pending","containerStatuses":[{"state":{"waiting":{"reason":"CrashLoopBackOff"}}}]}}]}`, nil
|
|
default:
|
|
return lifecycleDispatcher(&commandRecorder{})(ctx, timeout, name, args...)
|
|
}
|
|
}
|
|
orch, _ := newHookOrchestratorAdvanced(t, cfg, false, run, run)
|
|
if err := orch.TestHookWaitForWorkloadConvergence(context.Background()); err != nil {
|
|
t.Fatalf("expected workload convergence default-branch success, got %v", err)
|
|
}
|
|
|
|
cfgIgnore := lifecycleConfig(t)
|
|
cfgIgnore.Startup.AutoRecycleStuckPods = false
|
|
orchIgnoreDry, _ := newHookOrchestratorAdvanced(t, cfgIgnore, true, run, run)
|
|
now := time.Now().UTC().Add(-time.Hour)
|
|
orchIgnoreDry.TestHookMaybeAutoRecycleStuckPods(context.Background(), &now)
|
|
orchIgnoreDry.TestHookMaybeAutoHealCriticalWorkloadReplicas(context.Background(), &now)
|
|
|
|
runHealErr := func(ctx context.Context, timeout time.Duration, name string, args ...string) (string, error) {
|
|
command := name + " " + strings.Join(args, " ")
|
|
if name == "kubectl" && strings.Contains(command, "get deploy,statefulset -A -o json") {
|
|
return "", errors.New("workload query failed")
|
|
}
|
|
return lifecycleDispatcher(&commandRecorder{})(ctx, timeout, name, args...)
|
|
}
|
|
orchHealErr, _ := newHookOrchestratorAdvanced(t, lifecycleConfig(t), false, runHealErr, runHealErr)
|
|
if _, err := orchHealErr.TestHookHealCriticalWorkloadReplicas(context.Background()); err == nil || !strings.Contains(err.Error(), "query workloads") {
|
|
t.Fatalf("expected critical workload heal query-error branch, got %v", err)
|
|
}
|
|
})
|
|
})
|
|
}
|