ananke/testing/orchestrator/drill_recovery_test.go

237 lines
10 KiB
Go

package orchestrator
import (
"context"
"errors"
"io"
"log"
"strings"
"testing"
"time"
"scm.bstein.dev/bstein/ananke/internal/cluster"
"scm.bstein.dev/bstein/ananke/internal/execx"
"scm.bstein.dev/bstein/ananke/internal/state"
)
// TestLifecycleEtcdRestoreHappyPath runs one orchestration or CLI step.
// Signature: TestLifecycleEtcdRestoreHappyPath(t *testing.T).
// Why: covers non-dry-run etcd restore flow including snapshot discovery, verification, and k3s reset.
func TestLifecycleEtcdRestoreHappyPath(t *testing.T) {
orch, recorder, _ := newLifecycleOrchestrator(t)
orch.SetCommandOverrides(func(_ context.Context, _ time.Duration, name string, args ...string) (string, error) {
command := name + " " + strings.Join(args, " ")
recorder.record(name, args)
switch name {
case "ssh":
switch {
case strings.Contains(command, "sudo systemctl cat k3s"):
return "ExecStart=/usr/local/bin/k3s server", nil
case strings.Contains(command, "etcd-snapshot ls"):
return "Name Size Created Location\npre-shutdown 4.2M now \"file:///var/lib/rancher/k3s/server/db/snapshots/pre-shutdown\"\n", nil
case strings.Contains(command, "stat -c %s"):
return "2097152", nil
case strings.Contains(command, "sha256sum"):
return "aaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaa", nil
default:
return "ok", nil
}
default:
return "", nil
}
}, nil)
if err := orch.EtcdRestore(context.Background(), cluster.EtcdRestoreOptions{}); err != nil {
t.Fatalf("etcd restore failed: %v", err)
}
if !recorder.contains("--cluster-reset-restore-path") {
t.Fatalf("expected k3s cluster-reset command in call log")
}
}
// TestLifecycleEtcdRestoreRejectsExternalDatastore runs one orchestration or CLI step.
// Signature: TestLifecycleEtcdRestoreRejectsExternalDatastore(t *testing.T).
// Why: ensures restore exits early when k3s is configured for external datastore mode.
func TestLifecycleEtcdRestoreRejectsExternalDatastore(t *testing.T) {
cfg := lifecycleConfig(t)
orch := cluster.New(cfg, &execx.Runner{DryRun: false}, state.New(cfg.State.RunHistoryPath), testLogger())
orch.SetCommandOverrides(func(_ context.Context, _ time.Duration, name string, args ...string) (string, error) {
command := name + " " + strings.Join(args, " ")
if name == "ssh" && strings.Contains(command, "sudo systemctl cat k3s") {
return "ExecStart=/usr/local/bin/k3s server --datastore-endpoint=postgres://db:5432/k3s", nil
}
return "", nil
}, nil)
err := orch.EtcdRestore(context.Background(), cluster.EtcdRestoreOptions{ControlPlane: "titan-db"})
if err == nil {
t.Fatalf("expected external datastore restore rejection")
}
if !errors.Is(err, cluster.ErrEtcdRestoreNotApplicable) {
t.Fatalf("expected ErrEtcdRestoreNotApplicable, got %v", err)
}
}
// TestLifecycleStartupCriticalEndpointAutoHeal runs one orchestration or CLI step.
// Signature: TestLifecycleStartupCriticalEndpointAutoHeal(t *testing.T).
// Why: covers endpoint checklist self-heal path that restores backend replicas before declaring startup complete.
func TestLifecycleStartupCriticalEndpointAutoHeal(t *testing.T) {
cfg := lifecycleConfig(t)
cfg.Startup.RequireFluxHealth = false
cfg.Startup.RequireWorkloadConvergence = false
cfg.Startup.CriticalServiceEndpointWaitSec = 3
cfg.Startup.CriticalServiceEndpointPollSec = 1
recorder := &commandRecorder{}
orch := cluster.New(cfg, &execx.Runner{DryRun: false}, state.New(cfg.State.RunHistoryPath), testLogger())
var healed bool
dispatch := func(_ context.Context, _ time.Duration, name string, args ...string) (string, error) {
recorder.record(name, args)
command := name + " " + strings.Join(args, " ")
switch name {
case "ssh":
if strings.Contains(command, "__ANANKE_NODE_REACHABLE__") {
return "__ANANKE_NODE_REACHABLE__", nil
}
if strings.Contains(command, "systemctl cat k3s") {
return "ExecStart=/usr/local/bin/k3s server", nil
}
return "ok", nil
case "kubectl":
switch {
case strings.Contains(command, "version --request-timeout=5s"):
return "v1.31.0", nil
case strings.Contains(command, "jsonpath={.status.conditions[?(@.type==\"Ready\")].status}"):
return "True", nil
case strings.Contains(command, "jsonpath={.spec.url}"):
return cfg.ExpectedFluxSource, nil
case strings.Contains(command, "jsonpath={.spec.ref.branch}"):
return cfg.ExpectedFluxBranch, nil
case strings.Contains(command, "get endpoints victoria-metrics-single-server"):
if healed {
return "10.42.0.10\n", nil
}
return "", nil
case strings.Contains(command, "scale statefulset victoria-metrics-single-server"):
healed = true
return "", nil
case strings.Contains(command, "scale "), strings.Contains(command, "rollout status"):
return "ok", nil
case strings.Contains(command, "get pods -A -o json"):
return `{"items":[]}`, nil
case strings.Contains(command, "get deploy,statefulset -A -o json"):
return `{"items":[{"kind":"StatefulSet","metadata":{"namespace":"monitoring","name":"victoria-metrics-single-server"},"spec":{"replicas":1}}]}`, nil
case strings.Contains(command, "jsonpath={.status.readyReplicas}"):
return "1", nil
case strings.Contains(command, "get deployment -A -o jsonpath="):
return "monitoring\tgrafana\t1\n", nil
case strings.Contains(command, "get statefulset -A -o jsonpath="):
return "monitoring\tvictoria-metrics-single-server\t1\n", nil
case strings.Contains(command, "-n flux-system get kustomizations.kustomize.toolkit.fluxcd.io -o jsonpath="):
return "", nil
case strings.Contains(command, "get helmreleases.helm.toolkit.fluxcd.io -A -o jsonpath="):
return "", nil
default:
return "", nil
}
default:
return "", nil
}
}
orch.SetCommandOverrides(dispatch, dispatch)
if err := orch.Startup(context.Background(), cluster.StartupOptions{Reason: "critical-endpoint-heal"}); err != nil {
t.Fatalf("startup with endpoint auto-heal failed: %v", err)
}
if !recorder.contains("scale statefulset victoria-metrics-single-server") {
t.Fatalf("expected endpoint auto-heal scale command in call log")
}
}
// TestLifecycleStartupFluxImmutableJobSelfHeal runs one orchestration or CLI step.
// Signature: TestLifecycleStartupFluxImmutableJobSelfHeal(t *testing.T).
// Why: covers immutable-job detection and stale failed job cleanup during flux convergence.
func TestLifecycleStartupFluxImmutableJobSelfHeal(t *testing.T) {
cfg := lifecycleConfig(t)
cfg.Startup.RequireCriticalServiceEndpoints = false
cfg.Startup.RequireWorkloadConvergence = false
cfg.Startup.FluxHealthWaitSeconds = 4
cfg.Startup.FluxHealthPollSeconds = 1
recorder := &commandRecorder{}
orch := cluster.New(cfg, &execx.Runner{DryRun: false}, state.New(cfg.State.RunHistoryPath), testLogger())
fluxCalls := 0
dispatch := func(_ context.Context, _ time.Duration, name string, args ...string) (string, error) {
recorder.record(name, args)
command := name + " " + strings.Join(args, " ")
switch name {
case "ssh":
if strings.Contains(command, "__ANANKE_NODE_REACHABLE__") {
return "__ANANKE_NODE_REACHABLE__", nil
}
if strings.Contains(command, "systemctl cat k3s") {
return "ExecStart=/usr/local/bin/k3s server", nil
}
return "ok", nil
case "kubectl":
switch {
case strings.Contains(command, "version --request-timeout=5s"):
return "v1.31.0", nil
case strings.Contains(command, "jsonpath={.status.conditions[?(@.type==\"Ready\")].status}"):
return "True", nil
case strings.Contains(command, "jsonpath={.spec.url}"):
return cfg.ExpectedFluxSource, nil
case strings.Contains(command, "jsonpath={.spec.ref.branch}"):
return cfg.ExpectedFluxBranch, nil
case strings.Contains(command, "get kustomizations.kustomize.toolkit.fluxcd.io -A -o json"):
fluxCalls++
if fluxCalls <= 2 {
return `{"items":[{"metadata":{"namespace":"flux-system","name":"services"},"spec":{"suspend":false},"status":{"conditions":[{"type":"Ready","status":"False","message":"Job update failed: field is immutable"}]}}]}`, nil
}
return `{"items":[{"metadata":{"namespace":"flux-system","name":"services"},"spec":{"suspend":false},"status":{"conditions":[{"type":"Ready","status":"True","message":"ok"}]}}]}`, nil
case strings.Contains(command, "get jobs -A -o json"):
return `{"items":[{"metadata":{"namespace":"flux-system","name":"reconcile-services","labels":{"kustomize.toolkit.fluxcd.io/name":"services"}},"status":{"failed":1,"conditions":[{"type":"Failed","status":"True"}]}}]}`, nil
case strings.Contains(command, "delete job reconcile-services"):
return "", nil
case strings.Contains(command, "-n flux-system get kustomizations.kustomize.toolkit.fluxcd.io -o jsonpath="):
return "services\n", nil
case strings.Contains(command, "get helmreleases.helm.toolkit.fluxcd.io -A -o jsonpath="):
return "", nil
case strings.Contains(command, "patch "), strings.Contains(command, "annotate "):
return "", nil
case strings.Contains(command, "get deployment -A -o jsonpath="):
return "monitoring\tgrafana\t1\n", nil
case strings.Contains(command, "get statefulset -A -o jsonpath="):
return "monitoring\tvictoria-metrics-single-server\t1\n", nil
case strings.Contains(command, "scale "), strings.Contains(command, "rollout status"):
return "", nil
case strings.Contains(command, "jsonpath={.status.readyReplicas}"):
return "1", nil
case strings.Contains(command, "get pods -A -o json"):
return `{"items":[]}`, nil
default:
return "", nil
}
case "flux":
return "", nil
default:
return "", nil
}
}
orch.SetCommandOverrides(dispatch, dispatch)
if err := orch.Startup(context.Background(), cluster.StartupOptions{Reason: "immutable-job-heal"}); err != nil {
t.Fatalf("startup with immutable-job heal failed: %v", err)
}
if !recorder.contains("delete job reconcile-services") {
t.Fatalf("expected stale failed flux job deletion in call log")
}
}
// testLogger runs one orchestration or CLI step.
// Signature: testLogger() *log.Logger.
// Why: centralizes discarded logging for focused orchestration recovery tests.
func testLogger() *log.Logger {
return log.New(io.Discard, "", 0)
}