ananke/internal/cluster/orchestrator_unit_additional_test.go

484 lines
19 KiB
Go
Raw Permalink Normal View History

package cluster
import (
"context"
"errors"
"io"
"log"
"net"
"path/filepath"
"strings"
"testing"
"time"
"scm.bstein.dev/bstein/ananke/internal/config"
"scm.bstein.dev/bstein/ananke/internal/execx"
"scm.bstein.dev/bstein/ananke/internal/state"
)
type commandStub struct {
match func(name string, args []string) bool
out string
err error
}
// buildOrchestratorWithStubs runs one orchestration or CLI step.
// Signature: buildOrchestratorWithStubs(t *testing.T, cfg config.Config, stubs []commandStub) *Orchestrator.
// Why: helper centralizes deterministic command dispatch for fast, isolated unit tests.
func buildOrchestratorWithStubs(t *testing.T, cfg config.Config, stubs []commandStub) *Orchestrator {
t.Helper()
if cfg.State.Dir == "" {
cfg.State.Dir = t.TempDir()
}
if cfg.State.ReportsDir == "" {
cfg.State.ReportsDir = filepath.Join(cfg.State.Dir, "reports")
}
if cfg.State.RunHistoryPath == "" {
cfg.State.RunHistoryPath = filepath.Join(cfg.State.Dir, "runs.json")
}
orch := &Orchestrator{
cfg: cfg,
runner: &execx.Runner{},
store: state.New(cfg.State.RunHistoryPath),
log: log.New(io.Discard, "", 0),
}
dispatch := func(_ context.Context, _ time.Duration, name string, args ...string) (string, error) {
for _, stub := range stubs {
if stub.match(name, args) {
return stub.out, stub.err
}
}
return "", nil
}
orch.runOverride = dispatch
orch.runSensitiveOverride = dispatch
return orch
}
// matchContains runs one orchestration or CLI step.
// Signature: matchContains(cmd string, parts ...string) func(string, []string) bool.
// Why: concise substring matching keeps command stubs readable across many tests.
func matchContains(cmd string, parts ...string) func(string, []string) bool {
return func(name string, args []string) bool {
if name != cmd {
return false
}
joined := strings.Join(args, " ")
for _, part := range parts {
if !strings.Contains(joined, part) {
return false
}
}
return true
}
}
// TestNewConstructsOrchestrator runs one orchestration or CLI step.
// Signature: TestNewConstructsOrchestrator(t *testing.T).
// Why: covers constructor path in orchestrator core module.
func TestNewConstructsOrchestrator(t *testing.T) {
cfg := config.Config{State: config.State{RunHistoryPath: filepath.Join(t.TempDir(), "runs.json")}}
r := &execx.Runner{}
s := state.New(cfg.State.RunHistoryPath)
orch := New(cfg, r, s, log.New(io.Discard, "", 0))
if orch == nil || orch.runner != r || orch.store != s {
t.Fatalf("constructor returned unexpected orchestrator: %#v", orch)
}
}
// TestParseSnapshotPathFromEtcdSnapshotList runs one orchestration or CLI step.
// Signature: TestParseSnapshotPathFromEtcdSnapshotList(t *testing.T).
// Why: covers snapshot-path parser branches including header skip and no-match.
func TestParseSnapshotPathFromEtcdSnapshotList(t *testing.T) {
out := strings.Join([]string{
"Name Size Created Location",
`pre-shutdown 4.2M now "file:///var/lib/rancher/k3s/server/db/snapshots/pre-shutdown"`,
}, "\n")
got := parseSnapshotPathFromEtcdSnapshotList(out)
if got != "/var/lib/rancher/k3s/server/db/snapshots/pre-shutdown" {
t.Fatalf("unexpected snapshot path: %q", got)
}
if parseSnapshotPathFromEtcdSnapshotList("no snapshots") != "" {
t.Fatalf("expected no snapshot path")
}
}
// TestFluxSourceHelpers runs one orchestration or CLI step.
// Signature: TestFluxSourceHelpers(t *testing.T).
// Why: covers flux source readiness/guard/branch patch helper flows.
func TestFluxSourceHelpers(t *testing.T) {
cfg := config.Config{
ExpectedFluxSource: "ssh://git@scm.bstein.dev:2242/bstein/titan-iac.git",
}
orch := buildOrchestratorWithStubs(t, cfg, []commandStub{
{match: matchContains("kubectl", "jsonpath={.status.conditions"), out: "True"},
{match: matchContains("kubectl", "jsonpath={.spec.url}"), out: "ssh://git@scm.bstein.dev:2242/bstein/titan-iac.git"},
{match: matchContains("kubectl", "jsonpath={.spec.ref.branch}"), out: "main"},
{match: matchContains("kubectl", "patch", "gitrepository"), out: ""},
})
ready, err := orch.fluxSourceReady(context.Background())
if err != nil || !ready {
t.Fatalf("expected flux source ready, got ready=%v err=%v", ready, err)
}
if err := orch.guardFluxSourceDrift(context.Background(), "main", false); err != nil {
t.Fatalf("guardFluxSourceDrift failed: %v", err)
}
if err := orch.ensureFluxBranch(context.Background(), "main", false); err != nil {
t.Fatalf("ensureFluxBranch no-op failed: %v", err)
}
if got := normalizeGitURL(" SSH://Git@Host/Repo.git/ "); got != "ssh://git@host/repo" {
t.Fatalf("unexpected normalized url: %q", got)
}
}
// TestCoordinationHelpers runs one orchestration or CLI step.
// Signature: TestCoordinationHelpers(t *testing.T).
// Why: covers intent-age helpers, shell quoting, and peer selection logic.
func TestCoordinationHelpers(t *testing.T) {
in := state.Intent{UpdatedAt: time.Now().Add(-10 * time.Second)}
if intentAge(in) <= 0 {
t.Fatalf("expected positive age")
}
if !intentFresh(state.Intent{}, time.Second) {
t.Fatalf("zero timestamp should be fresh")
}
if shellQuote("a'b") != `'a'"'"'b'` {
t.Fatalf("unexpected shell quote output")
}
orch := buildOrchestratorWithStubs(t, config.Config{
Coordination: config.Coordination{
PeerHosts: []string{"titan-24", "titan-24", "titan-db"},
ForwardShutdownHost: "titan-db",
},
}, nil)
peers := orch.coordinationPeers()
if len(peers) != 2 {
t.Fatalf("expected deduped peers, got %v", peers)
}
}
// TestVerifyEtcdSnapshotAndRunSudoK3S runs one orchestration or CLI step.
// Signature: TestVerifyEtcdSnapshotAndRunSudoK3S(t *testing.T).
// Why: covers k3s command fallback and snapshot verification happy path.
func TestVerifyEtcdSnapshotAndRunSudoK3S(t *testing.T) {
orch := buildOrchestratorWithStubs(t, config.Config{}, []commandStub{
{match: matchContains("ssh", "stat -c %s"), out: "2097152"},
{match: matchContains("ssh", "k3s etcd-snapshot ls"), out: "/var/lib/rancher/k3s/server/db/snapshots/pre-shutdown"},
{match: matchContains("ssh", "sha256sum"), out: strings.Repeat("a", 64)},
})
if err := orch.verifyEtcdSnapshot(context.Background(), "titan-0a", "/var/lib/rancher/k3s/server/db/snapshots/pre-shutdown"); err != nil {
t.Fatalf("verifyEtcdSnapshot failed: %v", err)
}
}
// TestScalingHelpers runs one orchestration or CLI step.
// Signature: TestScalingHelpers(t *testing.T).
// Why: covers workload discovery, snapshot IO, and scale command orchestration.
func TestScalingHelpers(t *testing.T) {
cfg := config.Config{
ExcludedNamespaces: []string{"kube-system"},
State: config.State{Dir: t.TempDir()},
}
orch := buildOrchestratorWithStubs(t, cfg, []commandStub{
{
match: matchContains("kubectl", "get deployment", "jsonpath"),
out: strings.Join([]string{
"default\tgrafana\t1",
"kube-system\tcoredns\t2",
"",
}, "\n"),
},
{
match: matchContains("kubectl", "get statefulset", "jsonpath"),
out: "vault\tvault\t1\n",
},
{match: matchContains("kubectl", "scale", "deployment", "grafana"), out: ""},
{match: matchContains("kubectl", "scale", "statefulset", "vault"), out: ""},
})
entries, err := orch.listScalableWorkloads(context.Background())
if err != nil {
t.Fatalf("listScalableWorkloads failed: %v", err)
}
if len(entries) != 2 {
t.Fatalf("expected 2 scalable entries, got %d (%v)", len(entries), entries)
}
if err := orch.writeScaledWorkloadSnapshot(entries); err != nil {
t.Fatalf("writeScaledWorkloadSnapshot failed: %v", err)
}
snapshot, err := orch.readScaledWorkloadSnapshot()
if err != nil || snapshot == nil || len(snapshot.Entries) != 2 {
t.Fatalf("readScaledWorkloadSnapshot failed snapshot=%v err=%v", snapshot, err)
}
if err := orch.scaleWorkloads(context.Background(), entries, -1, 2); err != nil {
t.Fatalf("scaleWorkloads failed: %v", err)
}
}
// TestStorageReadyAndWorkloadHelpers runs one orchestration or CLI step.
// Signature: TestStorageReadyAndWorkloadHelpers(t *testing.T).
// Why: covers storage readiness checks and workload helper utilities.
func TestStorageReadyAndWorkloadHelpers(t *testing.T) {
cfg := config.Config{
Startup: config.Startup{
StorageMinReadyNodes: 1,
StorageCriticalPVCs: []string{"vault/data-vault-0"},
},
}
orch := buildOrchestratorWithStubs(t, cfg, []commandStub{
{match: matchContains("kubectl", "nodes.longhorn.io"), out: "titan-23:True:True\n"},
{match: matchContains("kubectl", "get pvc data-vault-0"), out: "Bound"},
})
ok, reason, err := orch.storageReady(context.Background())
if err != nil || !ok {
t.Fatalf("expected storageReady true, got ok=%v reason=%q err=%v", ok, reason, err)
}
}
// TestIngressAndServiceHelpers runs one orchestration or CLI step.
// Signature: TestIngressAndServiceHelpers(t *testing.T).
// Why: covers ingress host discovery helpers and URL parsing helpers.
func TestIngressAndServiceHelpers(t *testing.T) {
cfg := config.Config{
Startup: config.Startup{
IngressChecklistIgnoreHosts: []string{"ignore.bstein.dev"},
},
}
orch := buildOrchestratorWithStubs(t, cfg, []commandStub{
{match: matchContains("kubectl", "get ingress", "-A", "-o", "json"), out: `{"items":[{"metadata":{"namespace":"gitea"},"spec":{"rules":[{"host":"scm.bstein.dev"}]}},{"metadata":{"namespace":"x"},"spec":{"rules":[{"host":"ignore.bstein.dev"}]}}]}`},
})
hosts, err := orch.discoverIngressHosts(context.Background())
if err != nil || len(hosts) != 1 || hosts[0] != "scm.bstein.dev" {
t.Fatalf("discoverIngressHosts unexpected hosts=%v err=%v", hosts, err)
}
if got := hostFromURL("https://metrics.bstein.dev/api/health"); got != "metrics.bstein.dev" {
t.Fatalf("unexpected hostFromURL value: %q", got)
}
if !isLikelyHostname("metrics.bstein.dev") || isLikelyHostname("bad path/value") {
t.Fatalf("isLikelyHostname classification mismatch")
}
}
// TestWorkloadConvergenceHelpers runs one orchestration or CLI step.
// Signature: TestWorkloadConvergenceHelpers(t *testing.T).
// Why: covers controller readiness helpers and stuck-pod heuristics.
func TestWorkloadConvergenceHelpers(t *testing.T) {
replicas := int32(2)
item := workloadResource{Kind: "deployment"}
item.Spec.Replicas = &replicas
item.Status.ReadyReplicas = 1
desired, ready, ok := desiredReady(item)
if !ok || desired != 2 || ready != 1 {
t.Fatalf("desiredReady mismatch desired=%d ready=%d ok=%v", desired, ready, ok)
}
var pod podResource
pod.Metadata.OwnerReferences = []ownerReference{{Kind: "ReplicaSet"}}
if !podControllerOwned(pod) {
t.Fatalf("expected podControllerOwned=true")
}
pod.Status.ContainerStatuses = []podContainerStatus{{State: podContainerState{Waiting: &podContainerWaitingState{Reason: "CrashLoopBackOff"}}}}
reason := stuckContainerReason(pod, map[string]struct{}{"CrashLoopBackOff": struct{}{}})
if reason != "CrashLoopBackOff" {
t.Fatalf("unexpected stuck reason: %q", reason)
}
}
// TestDrainAndK3SHelpers runs one orchestration or CLI step.
// Signature: TestDrainAndK3SHelpers(t *testing.T).
// Why: covers node drain diagnostics and k3s snapshot selection flow.
func TestDrainAndK3SHelpers(t *testing.T) {
cfg := config.Config{
SSHManagedNodes: []string{"titan-0a"},
}
orch := buildOrchestratorWithStubs(t, cfg, []commandStub{
{match: matchContains("kubectl", "get pods", "--field-selector", "spec.nodeName=titan-22"), out: "vault vault-0 Running StatefulSet\n"},
{match: matchContains("ssh", "k3s etcd-snapshot ls"), out: "pre-shutdown /var/lib/rancher/k3s/server/db/snapshots/pre-shutdown"},
})
diag := orch.drainNodeDiagnostics(context.Background(), "titan-22")
if !strings.Contains(diag, "vault/vault-0") {
t.Fatalf("unexpected diagnostics output: %q", diag)
}
snapshot, err := orch.latestEtcdSnapshotPath(context.Background(), "titan-0a")
if err != nil || snapshot == "" {
t.Fatalf("latestEtcdSnapshotPath failed snapshot=%q err=%v", snapshot, err)
}
}
// TestTimesyncAndInventoryHelpers runs one orchestration or CLI step.
// Signature: TestTimesyncAndInventoryHelpers(t *testing.T).
// Why: covers time sync helpers, datastore endpoint parsing, and inventory assembly.
func TestTimesyncAndInventoryHelpers(t *testing.T) {
cfg := config.Config{
ControlPlanes: []string{"titan-0a"},
Workers: []string{"titan-22"},
SSHManagedNodes: []string{"titan-0a", "titan-22"},
SSHNodeHosts: map[string]string{
"titan-db": "10.0.0.10",
},
Coordination: config.Coordination{
PeerHosts: []string{"titan-24"},
ForwardShutdownHost: "titan-db",
},
}
orch := buildOrchestratorWithStubs(t, cfg, nil)
nodes := orch.inventoryNodesForValidation()
if len(nodes) < 3 {
t.Fatalf("expected combined inventory nodes, got %v", nodes)
}
if parseDatastoreEndpoint("ExecStart=/usr/local/bin/k3s server --datastore-endpoint=postgres://x") == "" {
t.Fatalf("expected datastore endpoint parse")
}
if !isTimeSynced("YES") || isTimeSynced("no") {
t.Fatalf("unexpected isTimeSynced behavior")
}
ln, err := net.Listen("tcp", "127.0.0.1:0")
if err != nil {
t.Fatalf("listen failed: %v", err)
}
defer ln.Close()
if !orch.tcpReachable(ln.Addr().String(), 500*time.Millisecond) {
t.Fatalf("expected tcpReachable=true for open listener")
}
}
// TestShutdownModeValidation runs one orchestration or CLI step.
// Signature: TestShutdownModeValidation(t *testing.T).
// Why: covers removed poweroff mode and invalid-mode errors.
func TestShutdownModeValidation(t *testing.T) {
if mode, err := normalizeShutdownMode("cluster-only"); err != nil || mode != "cluster-only" {
t.Fatalf("expected cluster-only mode, got mode=%q err=%v", mode, err)
}
if _, err := normalizeShutdownMode("bogus"); err == nil {
t.Fatalf("expected invalid mode error")
}
}
// TestWaitForAPIDryRunShortCircuit runs one orchestration or CLI step.
// Signature: TestWaitForAPIDryRunShortCircuit(t *testing.T).
// Why: covers dry-run short-circuit branch for api readiness wait.
func TestWaitForAPIDryRunShortCircuit(t *testing.T) {
orch := &Orchestrator{runner: &execx.Runner{DryRun: true}}
if err := orch.waitForAPI(context.Background(), 1, time.Millisecond); err != nil {
t.Fatalf("expected dry-run waitForAPI to pass: %v", err)
}
}
// TestGuardFluxSourceDriftMismatch runs one orchestration or CLI step.
// Signature: TestGuardFluxSourceDriftMismatch(t *testing.T).
// Why: covers url-drift and branch-drift error branches.
func TestGuardFluxSourceDriftMismatch(t *testing.T) {
cfg := config.Config{ExpectedFluxSource: "ssh://git@scm.bstein.dev:2242/bstein/titan-iac.git"}
orch := buildOrchestratorWithStubs(t, cfg, []commandStub{
{match: matchContains("kubectl", "jsonpath={.spec.url}"), out: "ssh://git@scm.bstein.dev:2242/bstein/wrong.git"},
})
if err := orch.guardFluxSourceDrift(context.Background(), "main", false); err == nil {
t.Fatalf("expected guardFluxSourceDrift mismatch error")
}
orch = buildOrchestratorWithStubs(t, cfg, []commandStub{
{match: matchContains("kubectl", "jsonpath={.spec.url}"), out: "ssh://git@scm.bstein.dev:2242/bstein/titan-iac.git"},
{match: matchContains("kubectl", "jsonpath={.spec.ref.branch}"), out: "atlasbot"},
})
if err := orch.guardFluxSourceDrift(context.Background(), "main", false); err == nil {
t.Fatalf("expected branch drift error")
}
}
// TestRunSudoK3SFailsWhenAllCandidatesFail runs one orchestration or CLI step.
// Signature: TestRunSudoK3SFailsWhenAllCandidatesFail(t *testing.T).
// Why: covers fallback failure return in runSudoK3S.
func TestRunSudoK3SFailsWhenAllCandidatesFail(t *testing.T) {
orch := buildOrchestratorWithStubs(t, config.Config{}, []commandStub{
{match: matchContains("ssh", "k3s"), err: errors.New("no binary")},
})
if _, err := orch.runSudoK3S(context.Background(), "titan-0a", "server"); err == nil {
t.Fatalf("expected runSudoK3S failure when all candidates fail")
}
}
// TestCriticalEndpointHelpers runs one orchestration or CLI step.
// Signature: TestCriticalEndpointHelpers(t *testing.T).
// Why: covers critical endpoint parsing and readiness checks that gate startup completion.
func TestCriticalEndpointHelpers(t *testing.T) {
cfg := config.Config{
Startup: config.Startup{
CriticalServiceEndpoints: []string{"monitoring/victoria-metrics-single-server"},
},
}
orch := buildOrchestratorWithStubs(t, cfg, []commandStub{
{match: matchContains("kubectl", "get endpoints victoria-metrics-single-server"), out: "10.42.0.10\n10.42.0.11\n"},
})
ok, detail, ns, svc, err := orch.criticalServiceEndpointsReady(context.Background())
if err != nil || !ok {
t.Fatalf("expected criticalServiceEndpointsReady success, got ok=%v detail=%q ns=%q svc=%q err=%v", ok, detail, ns, svc, err)
}
if detail != "services=1" {
t.Fatalf("unexpected readiness detail: %q", detail)
}
gotNS, gotSvc, err := parseCriticalServiceEndpoint("monitoring/victoria-metrics-single-server")
if err != nil || gotNS != "monitoring" || gotSvc != "victoria-metrics-single-server" {
t.Fatalf("unexpected parse result ns=%q svc=%q err=%v", gotNS, gotSvc, err)
}
if _, _, err := parseCriticalServiceEndpoint("invalid"); err == nil {
t.Fatalf("expected parseCriticalServiceEndpoint error")
}
}
// TestCriticalEndpointAutoHealWorkflow runs one orchestration or CLI step.
// Signature: TestCriticalEndpointAutoHealWorkflow(t *testing.T).
// Why: covers endpoint-zero recovery where startup heals workload replicas before succeeding.
func TestCriticalEndpointAutoHealWorkflow(t *testing.T) {
cfg := config.Config{
Startup: config.Startup{
CriticalServiceEndpointWaitSec: 2,
CriticalServiceEndpointPollSec: 1,
CriticalServiceEndpoints: []string{"monitoring/victoria-metrics-single-server"},
},
State: config.State{
Dir: t.TempDir(),
ReportsDir: filepath.Join(t.TempDir(), "reports"),
RunHistoryPath: filepath.Join(t.TempDir(), "runs.json"),
},
}
orch := &Orchestrator{
cfg: cfg,
runner: &execx.Runner{},
store: state.New(cfg.State.RunHistoryPath),
log: log.New(io.Discard, "", 0),
}
endpointChecks := 0
dispatch := func(_ context.Context, _ time.Duration, name string, args ...string) (string, error) {
joined := name + " " + strings.Join(args, " ")
if strings.Contains(joined, "get endpoints victoria-metrics-single-server") {
endpointChecks++
if endpointChecks == 1 {
return "", nil
}
return "10.42.0.10\n", nil
}
if strings.Contains(joined, "scale deployment victoria-metrics-single-server") {
return "", errors.New(`Error from server (NotFound): deployments.apps "victoria-metrics-single-server" not found`)
}
if strings.Contains(joined, "scale statefulset victoria-metrics-single-server") {
return "", nil
}
if strings.Contains(joined, "rollout status statefulset/victoria-metrics-single-server") {
return "statefulset rolled out", nil
}
return "", nil
}
orch.runOverride = dispatch
orch.runSensitiveOverride = dispatch
if err := orch.waitForCriticalServiceEndpoints(context.Background()); err != nil {
t.Fatalf("waitForCriticalServiceEndpoints failed: %v", err)
}
if endpointChecks < 2 {
t.Fatalf("expected repeated endpoint checks, got %d", endpointChecks)
}
}