237 lines
10 KiB
Go
237 lines
10 KiB
Go
package orchestrator
|
|
|
|
import (
|
|
"context"
|
|
"errors"
|
|
"io"
|
|
"log"
|
|
"strings"
|
|
"testing"
|
|
"time"
|
|
|
|
"scm.bstein.dev/bstein/ananke/internal/cluster"
|
|
"scm.bstein.dev/bstein/ananke/internal/execx"
|
|
"scm.bstein.dev/bstein/ananke/internal/state"
|
|
)
|
|
|
|
// TestLifecycleEtcdRestoreHappyPath runs one orchestration or CLI step.
|
|
// Signature: TestLifecycleEtcdRestoreHappyPath(t *testing.T).
|
|
// Why: covers non-dry-run etcd restore flow including snapshot discovery, verification, and k3s reset.
|
|
func TestLifecycleEtcdRestoreHappyPath(t *testing.T) {
|
|
orch, recorder, _ := newLifecycleOrchestrator(t)
|
|
orch.SetCommandOverrides(func(_ context.Context, _ time.Duration, name string, args ...string) (string, error) {
|
|
command := name + " " + strings.Join(args, " ")
|
|
recorder.record(name, args)
|
|
switch name {
|
|
case "ssh":
|
|
switch {
|
|
case strings.Contains(command, "sudo systemctl cat k3s"):
|
|
return "ExecStart=/usr/local/bin/k3s server", nil
|
|
case strings.Contains(command, "etcd-snapshot ls"):
|
|
return "Name Size Created Location\npre-shutdown 4.2M now \"file:///var/lib/rancher/k3s/server/db/snapshots/pre-shutdown\"\n", nil
|
|
case strings.Contains(command, "stat -c %s"):
|
|
return "2097152", nil
|
|
case strings.Contains(command, "sha256sum"):
|
|
return "aaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaa", nil
|
|
default:
|
|
return "ok", nil
|
|
}
|
|
default:
|
|
return "", nil
|
|
}
|
|
}, nil)
|
|
|
|
if err := orch.EtcdRestore(context.Background(), cluster.EtcdRestoreOptions{}); err != nil {
|
|
t.Fatalf("etcd restore failed: %v", err)
|
|
}
|
|
if !recorder.contains("--cluster-reset-restore-path") {
|
|
t.Fatalf("expected k3s cluster-reset command in call log")
|
|
}
|
|
}
|
|
|
|
// TestLifecycleEtcdRestoreRejectsExternalDatastore runs one orchestration or CLI step.
|
|
// Signature: TestLifecycleEtcdRestoreRejectsExternalDatastore(t *testing.T).
|
|
// Why: ensures restore exits early when k3s is configured for external datastore mode.
|
|
func TestLifecycleEtcdRestoreRejectsExternalDatastore(t *testing.T) {
|
|
cfg := lifecycleConfig(t)
|
|
orch := cluster.New(cfg, &execx.Runner{DryRun: false}, state.New(cfg.State.RunHistoryPath), testLogger())
|
|
orch.SetCommandOverrides(func(_ context.Context, _ time.Duration, name string, args ...string) (string, error) {
|
|
command := name + " " + strings.Join(args, " ")
|
|
if name == "ssh" && strings.Contains(command, "sudo systemctl cat k3s") {
|
|
return "ExecStart=/usr/local/bin/k3s server --datastore-endpoint=postgres://db:5432/k3s", nil
|
|
}
|
|
return "", nil
|
|
}, nil)
|
|
|
|
err := orch.EtcdRestore(context.Background(), cluster.EtcdRestoreOptions{ControlPlane: "titan-db"})
|
|
if err == nil {
|
|
t.Fatalf("expected external datastore restore rejection")
|
|
}
|
|
if !errors.Is(err, cluster.ErrEtcdRestoreNotApplicable) {
|
|
t.Fatalf("expected ErrEtcdRestoreNotApplicable, got %v", err)
|
|
}
|
|
}
|
|
|
|
// TestLifecycleStartupCriticalEndpointAutoHeal runs one orchestration or CLI step.
|
|
// Signature: TestLifecycleStartupCriticalEndpointAutoHeal(t *testing.T).
|
|
// Why: covers endpoint checklist self-heal path that restores backend replicas before declaring startup complete.
|
|
func TestLifecycleStartupCriticalEndpointAutoHeal(t *testing.T) {
|
|
cfg := lifecycleConfig(t)
|
|
cfg.Startup.RequireFluxHealth = false
|
|
cfg.Startup.RequireWorkloadConvergence = false
|
|
cfg.Startup.CriticalServiceEndpointWaitSec = 3
|
|
cfg.Startup.CriticalServiceEndpointPollSec = 1
|
|
|
|
recorder := &commandRecorder{}
|
|
orch := cluster.New(cfg, &execx.Runner{DryRun: false}, state.New(cfg.State.RunHistoryPath), testLogger())
|
|
var healed bool
|
|
dispatch := func(_ context.Context, _ time.Duration, name string, args ...string) (string, error) {
|
|
recorder.record(name, args)
|
|
command := name + " " + strings.Join(args, " ")
|
|
switch name {
|
|
case "ssh":
|
|
if strings.Contains(command, "__ANANKE_NODE_REACHABLE__") {
|
|
return "__ANANKE_NODE_REACHABLE__", nil
|
|
}
|
|
if strings.Contains(command, "systemctl cat k3s") {
|
|
return "ExecStart=/usr/local/bin/k3s server", nil
|
|
}
|
|
return "ok", nil
|
|
case "kubectl":
|
|
switch {
|
|
case strings.Contains(command, "version --request-timeout=5s"):
|
|
return "v1.31.0", nil
|
|
case strings.Contains(command, "jsonpath={.status.conditions[?(@.type==\"Ready\")].status}"):
|
|
return "True", nil
|
|
case strings.Contains(command, "jsonpath={.spec.url}"):
|
|
return cfg.ExpectedFluxSource, nil
|
|
case strings.Contains(command, "jsonpath={.spec.ref.branch}"):
|
|
return cfg.ExpectedFluxBranch, nil
|
|
case strings.Contains(command, "get endpoints victoria-metrics-single-server"):
|
|
if healed {
|
|
return "10.42.0.10\n", nil
|
|
}
|
|
return "", nil
|
|
case strings.Contains(command, "scale statefulset victoria-metrics-single-server"):
|
|
healed = true
|
|
return "", nil
|
|
case strings.Contains(command, "scale "), strings.Contains(command, "rollout status"):
|
|
return "ok", nil
|
|
case strings.Contains(command, "get pods -A -o json"):
|
|
return `{"items":[]}`, nil
|
|
case strings.Contains(command, "get deploy,statefulset -A -o json"):
|
|
return `{"items":[{"kind":"StatefulSet","metadata":{"namespace":"monitoring","name":"victoria-metrics-single-server"},"spec":{"replicas":1}}]}`, nil
|
|
case strings.Contains(command, "jsonpath={.status.readyReplicas}"):
|
|
return "1", nil
|
|
case strings.Contains(command, "get deployment -A -o jsonpath="):
|
|
return "monitoring\tgrafana\t1\n", nil
|
|
case strings.Contains(command, "get statefulset -A -o jsonpath="):
|
|
return "monitoring\tvictoria-metrics-single-server\t1\n", nil
|
|
case strings.Contains(command, "-n flux-system get kustomizations.kustomize.toolkit.fluxcd.io -o jsonpath="):
|
|
return "", nil
|
|
case strings.Contains(command, "get helmreleases.helm.toolkit.fluxcd.io -A -o jsonpath="):
|
|
return "", nil
|
|
default:
|
|
return "", nil
|
|
}
|
|
default:
|
|
return "", nil
|
|
}
|
|
}
|
|
orch.SetCommandOverrides(dispatch, dispatch)
|
|
|
|
if err := orch.Startup(context.Background(), cluster.StartupOptions{Reason: "critical-endpoint-heal"}); err != nil {
|
|
t.Fatalf("startup with endpoint auto-heal failed: %v", err)
|
|
}
|
|
if !recorder.contains("scale statefulset victoria-metrics-single-server") {
|
|
t.Fatalf("expected endpoint auto-heal scale command in call log")
|
|
}
|
|
}
|
|
|
|
// TestLifecycleStartupFluxImmutableJobSelfHeal runs one orchestration or CLI step.
|
|
// Signature: TestLifecycleStartupFluxImmutableJobSelfHeal(t *testing.T).
|
|
// Why: covers immutable-job detection and stale failed job cleanup during flux convergence.
|
|
func TestLifecycleStartupFluxImmutableJobSelfHeal(t *testing.T) {
|
|
cfg := lifecycleConfig(t)
|
|
cfg.Startup.RequireCriticalServiceEndpoints = false
|
|
cfg.Startup.RequireWorkloadConvergence = false
|
|
cfg.Startup.FluxHealthWaitSeconds = 4
|
|
cfg.Startup.FluxHealthPollSeconds = 1
|
|
|
|
recorder := &commandRecorder{}
|
|
orch := cluster.New(cfg, &execx.Runner{DryRun: false}, state.New(cfg.State.RunHistoryPath), testLogger())
|
|
fluxCalls := 0
|
|
dispatch := func(_ context.Context, _ time.Duration, name string, args ...string) (string, error) {
|
|
recorder.record(name, args)
|
|
command := name + " " + strings.Join(args, " ")
|
|
switch name {
|
|
case "ssh":
|
|
if strings.Contains(command, "__ANANKE_NODE_REACHABLE__") {
|
|
return "__ANANKE_NODE_REACHABLE__", nil
|
|
}
|
|
if strings.Contains(command, "systemctl cat k3s") {
|
|
return "ExecStart=/usr/local/bin/k3s server", nil
|
|
}
|
|
return "ok", nil
|
|
case "kubectl":
|
|
switch {
|
|
case strings.Contains(command, "version --request-timeout=5s"):
|
|
return "v1.31.0", nil
|
|
case strings.Contains(command, "jsonpath={.status.conditions[?(@.type==\"Ready\")].status}"):
|
|
return "True", nil
|
|
case strings.Contains(command, "jsonpath={.spec.url}"):
|
|
return cfg.ExpectedFluxSource, nil
|
|
case strings.Contains(command, "jsonpath={.spec.ref.branch}"):
|
|
return cfg.ExpectedFluxBranch, nil
|
|
case strings.Contains(command, "get kustomizations.kustomize.toolkit.fluxcd.io -A -o json"):
|
|
fluxCalls++
|
|
if fluxCalls <= 2 {
|
|
return `{"items":[{"metadata":{"namespace":"flux-system","name":"services"},"spec":{"suspend":false},"status":{"conditions":[{"type":"Ready","status":"False","message":"Job update failed: field is immutable"}]}}]}`, nil
|
|
}
|
|
return `{"items":[{"metadata":{"namespace":"flux-system","name":"services"},"spec":{"suspend":false},"status":{"conditions":[{"type":"Ready","status":"True","message":"ok"}]}}]}`, nil
|
|
case strings.Contains(command, "get jobs -A -o json"):
|
|
return `{"items":[{"metadata":{"namespace":"flux-system","name":"reconcile-services","labels":{"kustomize.toolkit.fluxcd.io/name":"services"}},"status":{"failed":1,"conditions":[{"type":"Failed","status":"True"}]}}]}`, nil
|
|
case strings.Contains(command, "delete job reconcile-services"):
|
|
return "", nil
|
|
case strings.Contains(command, "-n flux-system get kustomizations.kustomize.toolkit.fluxcd.io -o jsonpath="):
|
|
return "services\n", nil
|
|
case strings.Contains(command, "get helmreleases.helm.toolkit.fluxcd.io -A -o jsonpath="):
|
|
return "", nil
|
|
case strings.Contains(command, "patch "), strings.Contains(command, "annotate "):
|
|
return "", nil
|
|
case strings.Contains(command, "get deployment -A -o jsonpath="):
|
|
return "monitoring\tgrafana\t1\n", nil
|
|
case strings.Contains(command, "get statefulset -A -o jsonpath="):
|
|
return "monitoring\tvictoria-metrics-single-server\t1\n", nil
|
|
case strings.Contains(command, "scale "), strings.Contains(command, "rollout status"):
|
|
return "", nil
|
|
case strings.Contains(command, "jsonpath={.status.readyReplicas}"):
|
|
return "1", nil
|
|
case strings.Contains(command, "get pods -A -o json"):
|
|
return `{"items":[]}`, nil
|
|
default:
|
|
return "", nil
|
|
}
|
|
case "flux":
|
|
return "", nil
|
|
default:
|
|
return "", nil
|
|
}
|
|
}
|
|
orch.SetCommandOverrides(dispatch, dispatch)
|
|
|
|
if err := orch.Startup(context.Background(), cluster.StartupOptions{Reason: "immutable-job-heal"}); err != nil {
|
|
t.Fatalf("startup with immutable-job heal failed: %v", err)
|
|
}
|
|
if !recorder.contains("delete job reconcile-services") {
|
|
t.Fatalf("expected stale failed flux job deletion in call log")
|
|
}
|
|
}
|
|
|
|
// testLogger runs one orchestration or CLI step.
|
|
// Signature: testLogger() *log.Logger.
|
|
// Why: centralizes discarded logging for focused orchestration recovery tests.
|
|
func testLogger() *log.Logger {
|
|
return log.New(io.Discard, "", 0)
|
|
}
|