package orchestrator import ( "context" "errors" "strings" "testing" "time" "scm.bstein.dev/bstein/ananke/internal/cluster" ) // TestHookSchedulingStormHelpers runs one orchestration or CLI step. // Signature: TestHookSchedulingStormHelpers(t *testing.T). // Why: keeps scheduling-storm helper coverage in the split top-level testing module // required by the repo hygiene contract. func TestHookSchedulingStormHelpers(t *testing.T) { if got, ok := cluster.TestHookSchedulingStormOwnerWorkload("ai", "ReplicaSet", "ollama-rs", "Deployment", "ollama"); !ok || got != "ai/deployment/ollama" { t.Fatalf("unexpected deployment owner resolution: got=%q ok=%v", got, ok) } if got, ok := cluster.TestHookSchedulingStormOwnerWorkload("storage", "StatefulSet", "nextcloud", "", ""); !ok || got != "storage/statefulset/nextcloud" { t.Fatalf("unexpected statefulset owner resolution: got=%q ok=%v", got, ok) } if got, ok := cluster.TestHookSchedulingStormOwnerWorkload("ai", "ReplicaSet", "missing", "", ""); ok || got != "" { t.Fatalf("expected missing replicaset owner lookup to fail, got=%q ok=%v", got, ok) } if got := cluster.TestHookEventObservationCount(3, 9); got != 9 { t.Fatalf("expected series count to win, got %d", got) } if got := cluster.TestHookEventObservationCount(0, 0); got != 1 { t.Fatalf("expected zero-count normalization to 1, got %d", got) } now := time.Now().UTC().Round(time.Second) if got := cluster.TestHookEventLastObservedAt(now, now.Add(-time.Minute), now.Add(-2*time.Minute), now.Add(-3*time.Minute)); !got.Equal(now) { t.Fatalf("expected series timestamp priority, got %s", got) } if got := cluster.TestHookEventLastObservedAt(time.Time{}, now, now.Add(-time.Minute), now.Add(-2*time.Minute)); !got.Equal(now) { t.Fatalf("expected lastTimestamp fallback, got %s", got) } if got := cluster.TestHookEventLastObservedAt(time.Time{}, time.Time{}, now, now.Add(-time.Minute)); !got.Equal(now) { t.Fatalf("expected eventTime fallback, got %s", got) } if got := cluster.TestHookEventLastObservedAt(time.Time{}, time.Time{}, time.Time{}, now); !got.Equal(now) { t.Fatalf("expected creationTimestamp fallback, got %s", got) } } // TestHookSchedulingStormQuarantine runs one orchestration or CLI step. // Signature: TestHookSchedulingStormQuarantine(t *testing.T). // Why: verifies that only non-core workloads generating real scheduling storms // are auto-quarantined, which prevents event/Kine churn from spiking control-plane CPU. func TestHookSchedulingStormQuarantine(t *testing.T) { now := time.Now().UTC().Format(time.RFC3339) cfg := lifecycleConfig(t) cfg.Startup.AutoQuarantineSchedulingStorms = true cfg.Startup.SchedulingStormEventThreshold = 30 cfg.Startup.SchedulingStormWindowSeconds = 180 cfg.Startup.WorkloadConvergenceRequiredNamespaces = []string{"vault"} cfg.Startup.IgnoreWorkloadNamespaces = []string{"ignored-ns"} cfg.Startup.IgnoreWorkloads = []string{"monitoring/deployment/ignore-me"} cfg.Startup.IgnoreUnavailableNodes = []string{"titan-22"} scaledOllama := false run := func(ctx context.Context, timeout time.Duration, name string, args ...string) (string, error) { command := name + " " + strings.Join(args, " ") switch { case name == "kubectl" && strings.Contains(command, "get pods -A -o json"): return `{"items":[ {"metadata":{"namespace":"ai","name":"ollama-pod","ownerReferences":[{"kind":"ReplicaSet","name":"ollama-rs"}]},"spec":{},"status":{"phase":"Pending"}}, {"metadata":{"namespace":"vault","name":"vault-0","ownerReferences":[{"kind":"StatefulSet","name":"vault"}]},"spec":{},"status":{"phase":"Pending"}}, {"metadata":{"namespace":"ignored-ns","name":"skip-pod","ownerReferences":[{"kind":"ReplicaSet","name":"skip-rs"}]},"spec":{},"status":{"phase":"Pending"}}, {"metadata":{"namespace":"monitoring","name":"ignore-me-pod","ownerReferences":[{"kind":"ReplicaSet","name":"ignore-me-rs"}]},"spec":{},"status":{"phase":"Pending"}}, {"metadata":{"namespace":"monitoring","name":"ignored-node-pod","ownerReferences":[{"kind":"ReplicaSet","name":"ignored-node-rs"}]},"spec":{"nodeName":"titan-22"},"status":{"phase":"Pending"}}, {"metadata":{"namespace":"monitoring","name":"running-pod","ownerReferences":[{"kind":"ReplicaSet","name":"running-rs"}]},"spec":{},"status":{"phase":"Running"}} ]}`, nil case name == "kubectl" && strings.Contains(command, "get replicasets -A -o json"): return `{"items":[ {"metadata":{"namespace":"ai","name":"ollama-rs","ownerReferences":[{"kind":"Deployment","name":"ollama"}]}}, {"metadata":{"namespace":"ignored-ns","name":"skip-rs","ownerReferences":[{"kind":"Deployment","name":"skip"}]}}, {"metadata":{"namespace":"monitoring","name":"ignore-me-rs","ownerReferences":[{"kind":"Deployment","name":"ignore-me"}]}}, {"metadata":{"namespace":"monitoring","name":"ignored-node-rs","ownerReferences":[{"kind":"Deployment","name":"ignored-node"}]}}, {"metadata":{"namespace":"monitoring","name":"running-rs","ownerReferences":[{"kind":"Deployment","name":"running"}]}} ]}`, nil case name == "kubectl" && strings.Contains(command, "get events -A -o json"): return `{"items":[ {"metadata":{"creationTimestamp":"` + now + `"},"involvedObject":{"kind":"Pod","namespace":"ai","name":"ollama-pod"},"type":"Warning","reason":"FailedScheduling","count":45}, {"metadata":{"creationTimestamp":"` + now + `"},"involvedObject":{"kind":"Pod","namespace":"vault","name":"vault-0"},"type":"Warning","reason":"FailedScheduling","count":45}, {"metadata":{"creationTimestamp":"` + now + `"},"involvedObject":{"kind":"Pod","namespace":"ignored-ns","name":"skip-pod"},"type":"Warning","reason":"FailedScheduling","count":45}, {"metadata":{"creationTimestamp":"` + now + `"},"involvedObject":{"kind":"Pod","namespace":"monitoring","name":"ignore-me-pod"},"type":"Warning","reason":"FailedScheduling","count":45}, {"metadata":{"creationTimestamp":"` + now + `"},"involvedObject":{"kind":"Pod","namespace":"monitoring","name":"ignored-node-pod"},"type":"Warning","reason":"FailedScheduling","count":45}, {"metadata":{"creationTimestamp":"` + now + `"},"involvedObject":{"kind":"Pod","namespace":"monitoring","name":"running-pod"},"type":"Warning","reason":"FailedScheduling","count":45}, {"metadata":{"creationTimestamp":"2000-01-01T00:00:00Z"},"involvedObject":{"kind":"Pod","namespace":"monitoring","name":"stale-pod"},"type":"Warning","reason":"FailedScheduling","count":99} ]}`, nil case name == "kubectl" && strings.Contains(command, "get deploy,statefulset -A -o json"): return `{"items":[ {"kind":"Deployment","metadata":{"namespace":"ai","name":"ollama"},"spec":{"replicas":1}}, {"kind":"StatefulSet","metadata":{"namespace":"vault","name":"vault"},"spec":{"replicas":1}}, {"kind":"Deployment","metadata":{"namespace":"ignored-ns","name":"skip"},"spec":{"replicas":1}}, {"kind":"Deployment","metadata":{"namespace":"monitoring","name":"ignore-me"},"spec":{"replicas":1}}, {"kind":"Deployment","metadata":{"namespace":"monitoring","name":"ignored-node"},"spec":{"replicas":1}}, {"kind":"Deployment","metadata":{"namespace":"monitoring","name":"running"},"spec":{"replicas":1}} ]}`, nil case name == "kubectl" && strings.Contains(command, "-n ai scale deployment ollama --replicas=0"): scaledOllama = true return "", nil default: return lifecycleDispatcher(&commandRecorder{})(ctx, timeout, name, args...) } } orch, _ := newHookOrchestrator(t, cfg, run, run) orch.TestHookBeginStartupReport("scheduling-storm") defer orch.TestHookFinalizeStartupReport(nil) if err := orch.TestHookQuarantineSchedulingStormWorkloads(context.Background()); err != nil { t.Fatalf("quarantine scheduling storm workloads: %v", err) } if !scaledOllama { t.Fatalf("expected ollama deployment to be scaled to zero") } progress := readStartupProgress(t, orch) if !strings.Contains(progress, "ollama") { t.Fatalf("expected startup progress to mention ollama quarantine, payload=%s", progress) } if strings.Contains(progress, "vault") || strings.Contains(progress, "ignore-me") || strings.Contains(progress, "ignored-node") { t.Fatalf("expected only the non-core eligible workload to be quarantined, payload=%s", progress) } } // TestHookSchedulingStormTriggerGuards runs one orchestration or CLI step. // Signature: TestHookSchedulingStormTriggerGuards(t *testing.T). // Why: covers dry-run/disabled/rate-limit guards so the scheduler-storm auto-heal // only activates when the cluster is actually suffering this exact failure mode. func TestHookSchedulingStormTriggerGuards(t *testing.T) { cfgDisabled := lifecycleConfig(t) orchDisabled, _ := newHookOrchestrator(t, cfgDisabled, nil, nil) lastAttempt := time.Time{} orchDisabled.TestHookMaybeAutoQuarantineSchedulingStorms(context.Background(), &lastAttempt) if !lastAttempt.IsZero() { t.Fatalf("expected disabled scheduling-storm trigger to be skipped") } cfgDry := lifecycleConfig(t) cfgDry.Startup.AutoQuarantineSchedulingStorms = true orchDry := newDryRunHookOrchestrator(t, cfgDry, nil) orchDry.TestHookMaybeAutoQuarantineSchedulingStorms(context.Background(), &lastAttempt) if !lastAttempt.IsZero() { t.Fatalf("expected dry-run scheduling-storm trigger to be skipped") } cfgRate := lifecycleConfig(t) cfgRate.Startup.AutoQuarantineSchedulingStorms = true cfgRate.Startup.SchedulingStormEventThreshold = 5 cfgRate.Startup.SchedulingStormWindowSeconds = 60 recorder := &commandRecorder{} run := func(ctx context.Context, timeout time.Duration, name string, args ...string) (string, error) { recorder.record(name, args) command := name + " " + strings.Join(args, " ") switch { case name == "kubectl" && strings.Contains(command, "get pods -A -o json"): return `{"items":[]}`, nil case name == "kubectl" && strings.Contains(command, "get replicasets -A -o json"): return `{"items":[]}`, nil case name == "kubectl" && strings.Contains(command, "get events -A -o json"): return `{"items":[]}`, nil case name == "kubectl" && strings.Contains(command, "get deploy,statefulset -A -o json"): return `{"items":[]}`, nil default: return lifecycleDispatcher(recorder)(ctx, timeout, name, args...) } } orchRate, _ := newHookOrchestrator(t, cfgRate, run, run) lastAttempt = time.Now() orchRate.TestHookMaybeAutoQuarantineSchedulingStorms(context.Background(), &lastAttempt) if recorder.contains("get pods -A -o json") { t.Fatalf("expected rate-limited scheduling-storm trigger to skip kubectl scans") } } // TestHookSchedulingStormTriggerAndNoOpBranches runs one orchestration or CLI step. // Signature: TestHookSchedulingStormTriggerAndNoOpBranches(t *testing.T). // Why: raises scheduling-storm branch coverage on the success/no-op paths so the // auto-heal only acts on genuine event storms and stays quiet otherwise. func TestHookSchedulingStormTriggerAndNoOpBranches(t *testing.T) { cfg := lifecycleConfig(t) cfg.Startup.AutoQuarantineSchedulingStorms = true cfg.Startup.SchedulingStormEventThreshold = 0 cfg.Startup.SchedulingStormWindowSeconds = 0 scanRan := false run := func(ctx context.Context, timeout time.Duration, name string, args ...string) (string, error) { command := name + " " + strings.Join(args, " ") switch { case name == "kubectl" && strings.Contains(command, "get pods -A -o json"): scanRan = true return `{"items":[ {"metadata":{"namespace":"","name":"missing"}}, {"metadata":{"namespace":"monitoring","name":"no-owner"},"spec":{},"status":{"phase":"Pending"}}, {"metadata":{"namespace":"monitoring","name":"done","ownerReferences":[{"kind":"ReplicaSet","name":"done-rs"}]},"spec":{},"status":{"phase":"Running"}}, {"metadata":{"namespace":"monitoring","name":"zero-replicas","ownerReferences":[{"kind":"ReplicaSet","name":"zero-rs"}]},"spec":{},"status":{"phase":"Pending"}} ]}`, nil case name == "kubectl" && strings.Contains(command, "get replicasets -A -o json"): return `{"items":[ {"metadata":{"namespace":"","name":"bad-rs"}}, {"metadata":{"namespace":"monitoring","name":"done-rs","ownerReferences":[{"kind":"","name":"ignored"}]}}, {"metadata":{"namespace":"monitoring","name":"zero-rs","ownerReferences":[{"kind":"Deployment","name":"zero"}]}} ]}`, nil case name == "kubectl" && strings.Contains(command, "get events -A -o json"): return `{"items":[ {"metadata":{"creationTimestamp":"` + time.Now().UTC().Format(time.RFC3339) + `"},"involvedObject":{"kind":"Pod","namespace":"monitoring","name":"normal"},"type":"Normal","reason":"FailedScheduling","count":99}, {"metadata":{"creationTimestamp":"` + time.Now().UTC().Format(time.RFC3339) + `"},"involvedObject":{"kind":"Pod","namespace":"monitoring","name":"wrong-reason"},"type":"Warning","reason":"SomeOtherReason","count":99}, {"metadata":{"creationTimestamp":"` + time.Now().UTC().Format(time.RFC3339) + `"},"involvedObject":{"kind":"Service","namespace":"monitoring","name":"wrong-kind"},"type":"Warning","reason":"FailedScheduling","count":99}, {"metadata":{"creationTimestamp":"2000-01-01T00:00:00Z"},"involvedObject":{"kind":"Pod","namespace":"monitoring","name":"old"},"type":"Warning","reason":"FailedScheduling","count":99}, {"metadata":{"creationTimestamp":"` + time.Now().UTC().Format(time.RFC3339) + `"},"involvedObject":{"kind":"Pod","namespace":"monitoring","name":"low-count"},"type":"Warning","reason":"FailedScheduling","count":1}, {"metadata":{"creationTimestamp":"` + time.Now().UTC().Format(time.RFC3339) + `"},"involvedObject":{"kind":"Pod","namespace":"monitoring","name":"missing-pod"},"type":"Warning","reason":"FailedScheduling","count":99}, {"metadata":{"creationTimestamp":"` + time.Now().UTC().Format(time.RFC3339) + `"},"involvedObject":{"kind":"Pod","namespace":"monitoring","name":"done"},"type":"Warning","reason":"FailedScheduling","count":99}, {"metadata":{"creationTimestamp":"` + time.Now().UTC().Format(time.RFC3339) + `"},"involvedObject":{"kind":"Pod","namespace":"monitoring","name":"no-owner"},"type":"Warning","reason":"FailedScheduling","count":99}, {"metadata":{"creationTimestamp":"` + time.Now().UTC().Format(time.RFC3339) + `"},"involvedObject":{"kind":"Pod","namespace":"monitoring","name":"zero-replicas"},"type":"Warning","reason":"FailedScheduling","count":99} ]}`, nil case name == "kubectl" && strings.Contains(command, "get deploy,statefulset -A -o json"): return `{"items":[ {"kind":"","metadata":{"namespace":"monitoring","name":"blank-kind"}}, {"kind":"Job","metadata":{"namespace":"monitoring","name":"unsupported"}}, {"kind":"Deployment","metadata":{"namespace":"monitoring","name":"zero"},"spec":{"replicas":0}} ]}`, nil default: return lifecycleDispatcher(&commandRecorder{})(ctx, timeout, name, args...) } } orch, _ := newHookOrchestrator(t, cfg, run, run) orch.TestHookBeginStartupReport("scheduling-storm-noop") defer orch.TestHookFinalizeStartupReport(nil) lastAttempt := time.Time{} orch.TestHookMaybeAutoQuarantineSchedulingStorms(context.Background(), &lastAttempt) if lastAttempt.IsZero() { t.Fatalf("expected successful scheduling-storm trigger to update lastAttempt") } if !scanRan { t.Fatalf("expected scheduling-storm scan to execute") } progress := readStartupProgress(t, orch) if strings.Contains(progress, "quarantined scheduling storm workload") { t.Fatalf("expected no-op scheduling-storm scan to avoid auto-heal output, payload=%s", progress) } } // TestHookSchedulingStormErrorMatrix runs one orchestration or CLI step. // Signature: TestHookSchedulingStormErrorMatrix(t *testing.T). // Why: covers malformed/error response branches in the scheduling-storm scan so // Ananke can surface precise diagnostics when the API itself is part of the problem. func TestHookSchedulingStormErrorMatrix(t *testing.T) { cases := []struct { name string run func(context.Context, time.Duration, string, ...string) (string, error) wantErr string }{ { name: "pods-query-error", run: func(_ context.Context, _ time.Duration, name string, _ ...string) (string, error) { if name == "kubectl" { return "", errors.New("pods boom") } return "", nil }, wantErr: "query pods for scheduling storm scan", }, { name: "pods-decode-error", run: func(_ context.Context, _ time.Duration, name string, args ...string) (string, error) { if name == "kubectl" && strings.Contains(strings.Join(args, " "), "get pods -A -o json") { return "{", nil } return `{"items":[]}`, nil }, wantErr: "decode pods for scheduling storm scan", }, { name: "replicasets-query-error", run: func(_ context.Context, _ time.Duration, name string, args ...string) (string, error) { command := name + " " + strings.Join(args, " ") switch { case name == "kubectl" && strings.Contains(command, "get pods -A -o json"): return `{"items":[]}`, nil case name == "kubectl" && strings.Contains(command, "get replicasets -A -o json"): return "", errors.New("replicasets boom") default: return "", nil } }, wantErr: "query replicasets for scheduling storm scan", }, { name: "replicasets-decode-error", run: func(_ context.Context, _ time.Duration, name string, args ...string) (string, error) { command := name + " " + strings.Join(args, " ") switch { case name == "kubectl" && strings.Contains(command, "get pods -A -o json"): return `{"items":[]}`, nil case name == "kubectl" && strings.Contains(command, "get replicasets -A -o json"): return "{", nil default: return `{"items":[]}`, nil } }, wantErr: "decode replicasets for scheduling storm scan", }, { name: "events-query-error", run: func(_ context.Context, _ time.Duration, name string, args ...string) (string, error) { command := name + " " + strings.Join(args, " ") switch { case name == "kubectl" && strings.Contains(command, "get pods -A -o json"): return `{"items":[]}`, nil case name == "kubectl" && strings.Contains(command, "get replicasets -A -o json"): return `{"items":[]}`, nil case name == "kubectl" && strings.Contains(command, "get events -A -o json"): return "", errors.New("events boom") default: return "", nil } }, wantErr: "query events for scheduling storm scan", }, { name: "events-decode-error", run: func(_ context.Context, _ time.Duration, name string, args ...string) (string, error) { command := name + " " + strings.Join(args, " ") switch { case name == "kubectl" && strings.Contains(command, "get pods -A -o json"): return `{"items":[]}`, nil case name == "kubectl" && strings.Contains(command, "get replicasets -A -o json"): return `{"items":[]}`, nil case name == "kubectl" && strings.Contains(command, "get events -A -o json"): return "{", nil default: return `{"items":[]}`, nil } }, wantErr: "decode events for scheduling storm scan", }, { name: "workloads-query-error", run: func(_ context.Context, _ time.Duration, name string, args ...string) (string, error) { command := name + " " + strings.Join(args, " ") switch { case name == "kubectl" && strings.Contains(command, "get pods -A -o json"): return `{"items":[]}`, nil case name == "kubectl" && strings.Contains(command, "get replicasets -A -o json"): return `{"items":[]}`, nil case name == "kubectl" && strings.Contains(command, "get events -A -o json"): return `{"items":[]}`, nil case name == "kubectl" && strings.Contains(command, "get deploy,statefulset -A -o json"): return "", errors.New("workloads boom") default: return "", nil } }, wantErr: "query workloads for scheduling storm scan", }, { name: "workloads-decode-error", run: func(_ context.Context, _ time.Duration, name string, args ...string) (string, error) { command := name + " " + strings.Join(args, " ") switch { case name == "kubectl" && strings.Contains(command, "get pods -A -o json"): return `{"items":[]}`, nil case name == "kubectl" && strings.Contains(command, "get replicasets -A -o json"): return `{"items":[]}`, nil case name == "kubectl" && strings.Contains(command, "get events -A -o json"): return `{"items":[]}`, nil case name == "kubectl" && strings.Contains(command, "get deploy,statefulset -A -o json"): return "{", nil default: return "", nil } }, wantErr: "decode workloads for scheduling storm scan", }, } for _, tc := range cases { t.Run(tc.name, func(t *testing.T) { cfg := lifecycleConfig(t) cfg.Startup.AutoQuarantineSchedulingStorms = true orch, _ := newHookOrchestrator(t, cfg, tc.run, tc.run) err := orch.TestHookQuarantineSchedulingStormWorkloads(context.Background()) if err == nil || !strings.Contains(err.Error(), tc.wantErr) { t.Fatalf("expected error containing %q, got %v", tc.wantErr, err) } }) } } // TestHookSchedulingStormScaleError runs one orchestration or CLI step. // Signature: TestHookSchedulingStormScaleError(t *testing.T). // Why: covers the final error path where Ananke detects a real storm but cannot // scale the offending workload down. func TestHookSchedulingStormScaleError(t *testing.T) { now := time.Now().UTC().Format(time.RFC3339) cfg := lifecycleConfig(t) cfg.Startup.AutoQuarantineSchedulingStorms = true cfg.Startup.SchedulingStormEventThreshold = 5 cfg.Startup.SchedulingStormWindowSeconds = 60 run := func(ctx context.Context, timeout time.Duration, name string, args ...string) (string, error) { command := name + " " + strings.Join(args, " ") switch { case name == "kubectl" && strings.Contains(command, "get pods -A -o json"): return `{"items":[{"metadata":{"namespace":"ai","name":"ollama-pod","ownerReferences":[{"kind":"ReplicaSet","name":"ollama-rs"}]},"spec":{},"status":{"phase":"Pending"}}]}`, nil case name == "kubectl" && strings.Contains(command, "get replicasets -A -o json"): return `{"items":[{"metadata":{"namespace":"ai","name":"ollama-rs","ownerReferences":[{"kind":"Deployment","name":"ollama"}]}}]}`, nil case name == "kubectl" && strings.Contains(command, "get events -A -o json"): return `{"items":[{"metadata":{"creationTimestamp":"` + now + `"},"involvedObject":{"kind":"Pod","namespace":"ai","name":"ollama-pod"},"type":"Warning","reason":"FailedScheduling","count":45}]}`, nil case name == "kubectl" && strings.Contains(command, "get deploy,statefulset -A -o json"): return `{"items":[{"kind":"Deployment","metadata":{"namespace":"ai","name":"ollama"},"spec":{"replicas":1}}]}`, nil case name == "kubectl" && strings.Contains(command, "-n ai scale deployment ollama --replicas=0"): return "", errors.New("scale denied") default: return lifecycleDispatcher(&commandRecorder{})(ctx, timeout, name, args...) } } orch, _ := newHookOrchestrator(t, cfg, run, run) err := orch.TestHookQuarantineSchedulingStormWorkloads(context.Background()) if err == nil || !strings.Contains(err.Error(), "scale scheduling storm workload ai/deployment/ollama to 0") { t.Fatalf("expected scale error, got %v", err) } }