recovery(ananke): quarantine scheduling storm workloads

This commit is contained in:
codex 2026-05-05 12:09:58 -03:00
parent b7f7486350
commit d225291c5d
12 changed files with 858 additions and 0 deletions

View File

@ -150,6 +150,9 @@ startup:
ignore_workloads: [] ignore_workloads: []
ignore_unavailable_nodes: [] ignore_unavailable_nodes: []
auto_recycle_stuck_pods: true auto_recycle_stuck_pods: true
auto_quarantine_scheduling_storms: false
scheduling_storm_event_threshold: 30
scheduling_storm_window_seconds: 180
stuck_pod_grace_seconds: 180 stuck_pod_grace_seconds: 180
vault_unseal_key_file: /var/lib/ananke/vault-unseal.key vault_unseal_key_file: /var/lib/ananke/vault-unseal.key
vault_unseal_breakglass_command: "" vault_unseal_breakglass_command: ""

View File

@ -282,6 +282,9 @@ startup:
ignore_workloads: [] ignore_workloads: []
ignore_unavailable_nodes: [] ignore_unavailable_nodes: []
auto_recycle_stuck_pods: true auto_recycle_stuck_pods: true
auto_quarantine_scheduling_storms: true
scheduling_storm_event_threshold: 30
scheduling_storm_window_seconds: 180
stuck_pod_grace_seconds: 180 stuck_pod_grace_seconds: 180
vault_unseal_key_file: /var/lib/ananke/vault-unseal.key vault_unseal_key_file: /var/lib/ananke/vault-unseal.key
vault_unseal_breakglass_command: "ssh -o BatchMode=yes -o StrictHostKeyChecking=accept-new -i /home/tethys/.ssh/id_ed25519 -p 1122 brad@99.183.132.163 'cat ~/.ananke-breakglass/vault-unseal.key'" vault_unseal_breakglass_command: "ssh -o BatchMode=yes -o StrictHostKeyChecking=accept-new -i /home/tethys/.ssh/id_ed25519 -p 1122 brad@99.183.132.163 'cat ~/.ananke-breakglass/vault-unseal.key'"

View File

@ -282,6 +282,9 @@ startup:
ignore_workloads: [] ignore_workloads: []
ignore_unavailable_nodes: [] ignore_unavailable_nodes: []
auto_recycle_stuck_pods: true auto_recycle_stuck_pods: true
auto_quarantine_scheduling_storms: true
scheduling_storm_event_threshold: 30
scheduling_storm_window_seconds: 180
stuck_pod_grace_seconds: 180 stuck_pod_grace_seconds: 180
vault_unseal_key_file: /var/lib/ananke/vault-unseal.key vault_unseal_key_file: /var/lib/ananke/vault-unseal.key
vault_unseal_breakglass_command: "ssh -o BatchMode=yes -o StrictHostKeyChecking=accept-new -i /home/atlas/.ssh/id_ed25519 -p 1122 brad@99.183.132.163 'cat ~/.ananke-breakglass/vault-unseal.key'" vault_unseal_breakglass_command: "ssh -o BatchMode=yes -o StrictHostKeyChecking=accept-new -i /home/atlas/.ssh/id_ed25519 -p 1122 brad@99.183.132.163 'cat ~/.ananke-breakglass/vault-unseal.key'"

View File

@ -0,0 +1,261 @@
package cluster
import (
"context"
"encoding/json"
"fmt"
"sort"
"strings"
"time"
)
// maybeAutoQuarantineSchedulingStorms runs one orchestration or CLI step.
// Signature: (o *Orchestrator) maybeAutoQuarantineSchedulingStorms(ctx context.Context, lastAttempt *time.Time).
// Why: a non-core workload that cannot schedule can emit enough warning events to
// thrash the control plane datastore; quarantine keeps startup moving while
// preserving core services.
func (o *Orchestrator) maybeAutoQuarantineSchedulingStorms(ctx context.Context, lastAttempt *time.Time) {
if o.runner.DryRun || !o.cfg.Startup.AutoQuarantineSchedulingStorms {
return
}
now := time.Now()
if lastAttempt != nil && !lastAttempt.IsZero() && now.Sub(*lastAttempt) < 30*time.Second {
return
}
if lastAttempt != nil {
*lastAttempt = now
}
o.bestEffort("quarantine non-core scheduling storm workloads", func() error {
return o.quarantineSchedulingStormWorkloads(ctx)
})
}
// quarantineSchedulingStormWorkloads runs one orchestration or CLI step.
// Signature: (o *Orchestrator) quarantineSchedulingStormWorkloads(ctx context.Context) error.
// Why: limits startup-only mitigation to workloads proven to be generating a
// scheduling event storm, instead of scaling optional apps down blindly.
func (o *Orchestrator) quarantineSchedulingStormWorkloads(ctx context.Context) error {
podsOut, err := o.kubectl(ctx, 30*time.Second, "get", "pods", "-A", "-o", "json")
if err != nil {
return fmt.Errorf("query pods for scheduling storm scan: %w", err)
}
var pods podList
if err := json.Unmarshal([]byte(podsOut), &pods); err != nil {
return fmt.Errorf("decode pods for scheduling storm scan: %w", err)
}
rsOut, err := o.kubectl(ctx, 30*time.Second, "get", "replicasets", "-A", "-o", "json")
if err != nil {
return fmt.Errorf("query replicasets for scheduling storm scan: %w", err)
}
var rsList replicaSetList
if err := json.Unmarshal([]byte(rsOut), &rsList); err != nil {
return fmt.Errorf("decode replicasets for scheduling storm scan: %w", err)
}
eventsOut, err := o.kubectl(ctx, 30*time.Second, "get", "events", "-A", "-o", "json")
if err != nil {
return fmt.Errorf("query events for scheduling storm scan: %w", err)
}
var events eventList
if err := json.Unmarshal([]byte(eventsOut), &events); err != nil {
return fmt.Errorf("decode events for scheduling storm scan: %w", err)
}
workloadsOut, err := o.kubectl(ctx, 30*time.Second, "get", "deploy,statefulset", "-A", "-o", "json")
if err != nil {
return fmt.Errorf("query workloads for scheduling storm scan: %w", err)
}
var workloads workloadList
if err := json.Unmarshal([]byte(workloadsOut), &workloads); err != nil {
return fmt.Errorf("decode workloads for scheduling storm scan: %w", err)
}
requiredNamespaces := o.startupRequiredWorkloadNamespaces()
ignoredNamespaces := makeStringSet(o.cfg.Startup.IgnoreWorkloadNamespaces)
ignoredNodes := makeStringSet(o.cfg.Startup.IgnoreUnavailableNodes)
ignoreRules := parseWorkloadIgnoreRules(o.cfg.Startup.IgnoreWorkloads)
eventThreshold := o.cfg.Startup.SchedulingStormEventThreshold
if eventThreshold <= 0 {
eventThreshold = 30
}
window := time.Duration(o.cfg.Startup.SchedulingStormWindowSeconds) * time.Second
if window <= 0 {
window = 3 * time.Minute
}
podsByKey := map[string]podResource{}
for _, pod := range pods.Items {
ns := strings.TrimSpace(pod.Metadata.Namespace)
name := strings.TrimSpace(pod.Metadata.Name)
if ns == "" || name == "" {
continue
}
podsByKey[ns+"/"+name] = pod
}
rsOwners := map[string]ownerReference{}
for _, rs := range rsList.Items {
ns := strings.TrimSpace(rs.Metadata.Namespace)
name := strings.TrimSpace(rs.Metadata.Name)
if ns == "" || name == "" {
continue
}
for _, owner := range rs.Metadata.OwnerReferences {
kind := strings.TrimSpace(owner.Kind)
ownerName := strings.TrimSpace(owner.Name)
if kind == "" || ownerName == "" {
continue
}
rsOwners[ns+"/"+name] = owner
break
}
}
workloadDesired := map[string]int32{}
for _, item := range workloads.Items {
kind := strings.ToLower(strings.TrimSpace(item.Kind))
ns := strings.TrimSpace(item.Metadata.Namespace)
name := strings.TrimSpace(item.Metadata.Name)
if kind == "" || ns == "" || name == "" {
continue
}
desired, _, ok := desiredReady(item)
if !ok {
continue
}
workloadDesired[ns+"/"+kind+"/"+name] = desired
}
quarantined := []string{}
seen := map[string]struct{}{}
now := time.Now()
for _, event := range events.Items {
if !strings.EqualFold(strings.TrimSpace(event.Type), "Warning") {
continue
}
if strings.TrimSpace(event.Reason) != "FailedScheduling" {
continue
}
if !strings.EqualFold(strings.TrimSpace(event.InvolvedObject.Kind), "Pod") {
continue
}
lastSeen := eventLastObservedAt(event)
if !lastSeen.IsZero() && now.Sub(lastSeen) > window {
continue
}
count := eventObservationCount(event)
if count < eventThreshold {
continue
}
podKey := strings.TrimSpace(event.InvolvedObject.Namespace) + "/" + strings.TrimSpace(event.InvolvedObject.Name)
pod, ok := podsByKey[podKey]
if !ok {
continue
}
if !strings.EqualFold(strings.TrimSpace(pod.Status.Phase), "Pending") {
continue
}
ns := strings.TrimSpace(pod.Metadata.Namespace)
if _, ok := requiredNamespaces[ns]; ok {
continue
}
if _, ok := ignoredNamespaces[ns]; ok {
continue
}
if workloadIgnored(ignoreRules, ns, "", strings.TrimSpace(pod.Metadata.Name)) {
continue
}
if podTargetsIgnoredNode(pod, ignoredNodes) {
continue
}
workload, ok := schedulingStormOwnerWorkload(pod, rsOwners)
if !ok {
continue
}
if workloadIgnored(ignoreRules, workload.Namespace, workload.Kind, workload.Name) {
continue
}
workloadKey := workload.Namespace + "/" + workload.Kind + "/" + workload.Name
if _, done := seen[workloadKey]; done {
continue
}
desired := workloadDesired[workloadKey]
if desired <= 0 {
continue
}
if err := o.ensureWorkloadReplicas(ctx, workload, 0); err != nil {
return fmt.Errorf("scale scheduling storm workload %s to 0: %w", workloadKey, err)
}
seen[workloadKey] = struct{}{}
quarantined = append(quarantined, fmt.Sprintf("%s events=%d window=%s", workloadKey, count, window))
}
if len(quarantined) == 0 {
return nil
}
sort.Strings(quarantined)
detail := fmt.Sprintf("quarantined scheduling storm workload(s): %s", joinLimited(quarantined, 8))
o.log.Printf("%s", detail)
o.noteStartupAutoHeal(detail)
return nil
}
// schedulingStormOwnerWorkload runs one orchestration or CLI step.
// Signature: schedulingStormOwnerWorkload(pod podResource, rsOwners map[string]ownerReference) (startupWorkload, bool).
// Why: scheduling storms happen at the pod layer, but safe mitigation needs to
// operate on the owning deployment or statefulset.
func schedulingStormOwnerWorkload(pod podResource, rsOwners map[string]ownerReference) (startupWorkload, bool) {
ns := strings.TrimSpace(pod.Metadata.Namespace)
for _, owner := range pod.Metadata.OwnerReferences {
switch strings.TrimSpace(owner.Kind) {
case "StatefulSet":
if name := strings.TrimSpace(owner.Name); name != "" {
return startupWorkload{Namespace: ns, Kind: "statefulset", Name: name}, true
}
case "ReplicaSet":
rsName := strings.TrimSpace(owner.Name)
if rsName == "" {
continue
}
rsOwner, ok := rsOwners[ns+"/"+rsName]
if !ok || strings.TrimSpace(rsOwner.Kind) != "Deployment" || strings.TrimSpace(rsOwner.Name) == "" {
continue
}
return startupWorkload{Namespace: ns, Kind: "deployment", Name: strings.TrimSpace(rsOwner.Name)}, true
}
}
return startupWorkload{}, false
}
// eventObservationCount runs one orchestration or CLI step.
// Signature: eventObservationCount(event eventResource) int.
// Why: event count can live either on the root event or in the series payload;
// using the max keeps detection stable across Kubernetes versions.
func eventObservationCount(event eventResource) int {
count := event.Count
if event.Series.Count > count {
count = event.Series.Count
}
if count < 1 {
return 1
}
return count
}
// eventLastObservedAt runs one orchestration or CLI step.
// Signature: eventLastObservedAt(event eventResource) time.Time.
// Why: event recency fields vary by cluster version; prefer the newest explicit
// observation time and fall back to creation time when needed.
func eventLastObservedAt(event eventResource) time.Time {
switch {
case !event.Series.LastObservedTime.IsZero():
return event.Series.LastObservedTime
case !event.LastTimestamp.IsZero():
return event.LastTimestamp
case !event.EventTime.IsZero():
return event.EventTime
default:
return event.Metadata.CreationTimestamp
}
}

View File

@ -177,6 +177,46 @@ type jobConditionRef struct {
Status string `json:"status"` Status string `json:"status"`
} }
type eventList struct {
Items []eventResource `json:"items"`
}
type eventResource struct {
Metadata struct {
Namespace string `json:"namespace"`
CreationTimestamp time.Time `json:"creationTimestamp"`
} `json:"metadata"`
InvolvedObject struct {
Kind string `json:"kind"`
Namespace string `json:"namespace"`
Name string `json:"name"`
} `json:"involvedObject"`
Type string `json:"type"`
Reason string `json:"reason"`
Message string `json:"message"`
Count int `json:"count"`
EventTime time.Time `json:"eventTime"`
LastTimestamp time.Time `json:"lastTimestamp"`
Series eventSeries `json:"series"`
}
type eventSeries struct {
Count int `json:"count"`
LastObservedTime time.Time `json:"lastObservedTime"`
}
type replicaSetList struct {
Items []replicaSetResource `json:"items"`
}
type replicaSetResource struct {
Metadata struct {
Namespace string `json:"namespace"`
Name string `json:"name"`
OwnerReferences []ownerReference `json:"ownerReferences"`
} `json:"metadata"`
}
type workloadResource struct { type workloadResource struct {
Kind string `json:"kind"` Kind string `json:"kind"`
Metadata struct { Metadata struct {
@ -221,6 +261,7 @@ type podResource struct {
type ownerReference struct { type ownerReference struct {
Kind string `json:"kind"` Kind string `json:"kind"`
Name string `json:"name"`
} }
type podContainerStatus struct { type podContainerStatus struct {

View File

@ -26,10 +26,12 @@ func (o *Orchestrator) waitForWorkloadConvergence(ctx context.Context) error {
lastLogged := time.Time{} lastLogged := time.Time{}
lastRecycleAttempt := time.Time{} lastRecycleAttempt := time.Time{}
lastReplicaHeal := time.Time{} lastReplicaHeal := time.Time{}
lastSchedulingStormHeal := time.Time{}
for { for {
prevFailure := lastFailure prevFailure := lastFailure
o.maybeAutoRecycleStuckPods(ctx, &lastRecycleAttempt) o.maybeAutoRecycleStuckPods(ctx, &lastRecycleAttempt)
o.maybeAutoHealCriticalWorkloadReplicas(ctx, &lastReplicaHeal) o.maybeAutoHealCriticalWorkloadReplicas(ctx, &lastReplicaHeal)
o.maybeAutoQuarantineSchedulingStorms(ctx, &lastSchedulingStormHeal)
ready, detail, err := o.workloadConvergenceReady(ctx) ready, detail, err := o.workloadConvergenceReady(ctx)
if err != nil { if err != nil {
lastFailure = err.Error() lastFailure = err.Error()

View File

@ -0,0 +1,88 @@
package cluster
import (
"context"
"fmt"
"strings"
"time"
)
// TestHookMaybeAutoQuarantineSchedulingStorms runs one orchestration or CLI step.
// Signature: (o *Orchestrator) TestHookMaybeAutoQuarantineSchedulingStorms(ctx context.Context, lastAttempt *time.Time).
// Why: exposes the scheduling-storm trigger guard to the split top-level test module.
func (o *Orchestrator) TestHookMaybeAutoQuarantineSchedulingStorms(ctx context.Context, lastAttempt *time.Time) {
o.maybeAutoQuarantineSchedulingStorms(ctx, lastAttempt)
}
// TestHookQuarantineSchedulingStormWorkloads runs one orchestration or CLI step.
// Signature: (o *Orchestrator) TestHookQuarantineSchedulingStormWorkloads(ctx context.Context) error.
// Why: exposes the scheduling-storm auto-heal body to the split top-level test module.
func (o *Orchestrator) TestHookQuarantineSchedulingStormWorkloads(ctx context.Context) error {
return o.quarantineSchedulingStormWorkloads(ctx)
}
// TestHookSchedulingStormOwnerWorkload runs one orchestration or CLI step.
// Signature: TestHookSchedulingStormOwnerWorkload(namespace string, ownerKind string, ownerName string, rsOwnerKind string, rsOwnerName string) (string, bool).
// Why: exposes owner-resolution behavior without leaking internal workload types.
func TestHookSchedulingStormOwnerWorkload(
namespace string,
ownerKind string,
ownerName string,
rsOwnerKind string,
rsOwnerName string,
) (string, bool) {
var pod podResource
pod.Metadata.Namespace = strings.TrimSpace(namespace)
pod.Metadata.OwnerReferences = []ownerReference{{
Kind: strings.TrimSpace(ownerKind),
Name: strings.TrimSpace(ownerName),
}}
rsOwners := map[string]ownerReference{}
if rsName := strings.TrimSpace(ownerName); rsName != "" && strings.TrimSpace(ownerKind) == "ReplicaSet" {
rsOwners[pod.Metadata.Namespace+"/"+rsName] = ownerReference{
Kind: strings.TrimSpace(rsOwnerKind),
Name: strings.TrimSpace(rsOwnerName),
}
}
workload, ok := schedulingStormOwnerWorkload(pod, rsOwners)
if !ok {
return "", false
}
return fmt.Sprintf("%s/%s/%s", workload.Namespace, workload.Kind, workload.Name), true
}
// TestHookEventObservationCount runs one orchestration or CLI step.
// Signature: TestHookEventObservationCount(count int, seriesCount int) int.
// Why: exposes event-count normalization used by scheduling-storm detection.
func TestHookEventObservationCount(count int, seriesCount int) int {
return eventObservationCount(eventResource{
Count: count,
Series: eventSeries{
Count: seriesCount,
},
})
}
// TestHookEventLastObservedAt runs one orchestration or CLI step.
// Signature: TestHookEventLastObservedAt(seriesLastObserved time.Time, lastTimestamp time.Time, eventTime time.Time, creationTimestamp time.Time) time.Time.
// Why: exposes event-time fallback behavior used by scheduling-storm detection.
func TestHookEventLastObservedAt(
seriesLastObserved time.Time,
lastTimestamp time.Time,
eventTime time.Time,
creationTimestamp time.Time,
) time.Time {
return eventLastObservedAt(eventResource{
LastTimestamp: lastTimestamp,
EventTime: eventTime,
Series: eventSeries{
LastObservedTime: seriesLastObserved,
},
Metadata: struct {
Namespace string `json:"namespace"`
CreationTimestamp time.Time `json:"creationTimestamp"`
}{
CreationTimestamp: creationTimestamp,
},
})
}

View File

@ -237,6 +237,12 @@ func (c *Config) applyDefaults() {
if c.UPS.TelemetryTimeoutSeconds <= 0 { if c.UPS.TelemetryTimeoutSeconds <= 0 {
c.UPS.TelemetryTimeoutSeconds = 90 c.UPS.TelemetryTimeoutSeconds = 90
} }
if c.Startup.SchedulingStormEventThreshold <= 0 {
c.Startup.SchedulingStormEventThreshold = 30
}
if c.Startup.SchedulingStormWindowSeconds <= 0 {
c.Startup.SchedulingStormWindowSeconds = 180
}
if c.Coordination.ForwardShutdownConfig == "" { if c.Coordination.ForwardShutdownConfig == "" {
c.Coordination.ForwardShutdownConfig = "/etc/ananke/ananke.yaml" c.Coordination.ForwardShutdownConfig = "/etc/ananke/ananke.yaml"
} }

View File

@ -87,6 +87,9 @@ type Startup struct {
IgnoreWorkloads []string `yaml:"ignore_workloads"` IgnoreWorkloads []string `yaml:"ignore_workloads"`
IgnoreUnavailableNodes []string `yaml:"ignore_unavailable_nodes"` IgnoreUnavailableNodes []string `yaml:"ignore_unavailable_nodes"`
AutoRecycleStuckPods bool `yaml:"auto_recycle_stuck_pods"` AutoRecycleStuckPods bool `yaml:"auto_recycle_stuck_pods"`
AutoQuarantineSchedulingStorms bool `yaml:"auto_quarantine_scheduling_storms"`
SchedulingStormEventThreshold int `yaml:"scheduling_storm_event_threshold"`
SchedulingStormWindowSeconds int `yaml:"scheduling_storm_window_seconds"`
StuckPodGraceSeconds int `yaml:"stuck_pod_grace_seconds"` StuckPodGraceSeconds int `yaml:"stuck_pod_grace_seconds"`
VaultUnsealKeyFile string `yaml:"vault_unseal_key_file"` VaultUnsealKeyFile string `yaml:"vault_unseal_key_file"`
VaultUnsealBreakglassCommand string `yaml:"vault_unseal_breakglass_command"` VaultUnsealBreakglassCommand string `yaml:"vault_unseal_breakglass_command"`

View File

@ -343,6 +343,14 @@ func (c Config) Validate() error {
return fmt.Errorf("config.coordination.forward_shutdown_config must not be empty when forward_shutdown_host is set") return fmt.Errorf("config.coordination.forward_shutdown_config must not be empty when forward_shutdown_host is set")
} }
} }
if c.Startup.AutoQuarantineSchedulingStorms {
if c.Startup.SchedulingStormEventThreshold <= 0 {
return fmt.Errorf("config.startup.scheduling_storm_event_threshold must be > 0 when auto_quarantine_scheduling_storms is enabled")
}
if c.Startup.SchedulingStormWindowSeconds <= 0 {
return fmt.Errorf("config.startup.scheduling_storm_window_seconds must be > 0 when auto_quarantine_scheduling_storms is enabled")
}
}
for _, peer := range c.Coordination.PeerHosts { for _, peer := range c.Coordination.PeerHosts {
if strings.TrimSpace(peer) == "" { if strings.TrimSpace(peer) == "" {
return fmt.Errorf("config.coordination.peer_hosts entries must not be empty") return fmt.Errorf("config.coordination.peer_hosts entries must not be empty")

View File

@ -92,6 +92,14 @@ func TestValidateRejectsInvalidFieldsMatrix(t *testing.T) {
}}, }},
{"bad_empty_ignore_unavailable_nodes_entry", func(c *Config) { c.Startup.IgnoreUnavailableNodes = []string{"", "titan-22"} }}, {"bad_empty_ignore_unavailable_nodes_entry", func(c *Config) { c.Startup.IgnoreUnavailableNodes = []string{"", "titan-22"} }},
{"bad_empty_vault_key_file", func(c *Config) { c.Startup.VaultUnsealKeyFile = "" }}, {"bad_empty_vault_key_file", func(c *Config) { c.Startup.VaultUnsealKeyFile = "" }},
{"bad_scheduling_storm_threshold", func(c *Config) {
c.Startup.AutoQuarantineSchedulingStorms = true
c.Startup.SchedulingStormEventThreshold = 0
}},
{"bad_scheduling_storm_window", func(c *Config) {
c.Startup.AutoQuarantineSchedulingStorms = true
c.Startup.SchedulingStormWindowSeconds = 0
}},
{"bad_ssh_port_low", func(c *Config) { c.SSHPort = 0 }}, {"bad_ssh_port_low", func(c *Config) { c.SSHPort = 0 }},
{"bad_ups_provider", func(c *Config) { c.UPS.Enabled = true; c.UPS.Provider = "" }}, {"bad_ups_provider", func(c *Config) { c.UPS.Enabled = true; c.UPS.Provider = "" }},
{"bad_ups_on_battery_grace_negative", func(c *Config) { c.UPS.Enabled = true; c.UPS.OnBatteryGraceSeconds = -1 }}, {"bad_ups_on_battery_grace_negative", func(c *Config) { c.UPS.Enabled = true; c.UPS.OnBatteryGraceSeconds = -1 }},

View File

@ -0,0 +1,432 @@
package orchestrator
import (
"context"
"errors"
"strings"
"testing"
"time"
"scm.bstein.dev/bstein/ananke/internal/cluster"
)
// TestHookSchedulingStormHelpers runs one orchestration or CLI step.
// Signature: TestHookSchedulingStormHelpers(t *testing.T).
// Why: keeps scheduling-storm helper coverage in the split top-level testing module
// required by the repo hygiene contract.
func TestHookSchedulingStormHelpers(t *testing.T) {
if got, ok := cluster.TestHookSchedulingStormOwnerWorkload("ai", "ReplicaSet", "ollama-rs", "Deployment", "ollama"); !ok || got != "ai/deployment/ollama" {
t.Fatalf("unexpected deployment owner resolution: got=%q ok=%v", got, ok)
}
if got, ok := cluster.TestHookSchedulingStormOwnerWorkload("storage", "StatefulSet", "nextcloud", "", ""); !ok || got != "storage/statefulset/nextcloud" {
t.Fatalf("unexpected statefulset owner resolution: got=%q ok=%v", got, ok)
}
if got, ok := cluster.TestHookSchedulingStormOwnerWorkload("ai", "ReplicaSet", "missing", "", ""); ok || got != "" {
t.Fatalf("expected missing replicaset owner lookup to fail, got=%q ok=%v", got, ok)
}
if got := cluster.TestHookEventObservationCount(3, 9); got != 9 {
t.Fatalf("expected series count to win, got %d", got)
}
if got := cluster.TestHookEventObservationCount(0, 0); got != 1 {
t.Fatalf("expected zero-count normalization to 1, got %d", got)
}
now := time.Now().UTC().Round(time.Second)
if got := cluster.TestHookEventLastObservedAt(now, now.Add(-time.Minute), now.Add(-2*time.Minute), now.Add(-3*time.Minute)); !got.Equal(now) {
t.Fatalf("expected series timestamp priority, got %s", got)
}
if got := cluster.TestHookEventLastObservedAt(time.Time{}, now, now.Add(-time.Minute), now.Add(-2*time.Minute)); !got.Equal(now) {
t.Fatalf("expected lastTimestamp fallback, got %s", got)
}
if got := cluster.TestHookEventLastObservedAt(time.Time{}, time.Time{}, now, now.Add(-time.Minute)); !got.Equal(now) {
t.Fatalf("expected eventTime fallback, got %s", got)
}
if got := cluster.TestHookEventLastObservedAt(time.Time{}, time.Time{}, time.Time{}, now); !got.Equal(now) {
t.Fatalf("expected creationTimestamp fallback, got %s", got)
}
}
// TestHookSchedulingStormQuarantine runs one orchestration or CLI step.
// Signature: TestHookSchedulingStormQuarantine(t *testing.T).
// Why: verifies that only non-core workloads generating real scheduling storms
// are auto-quarantined, which prevents event/Kine churn from spiking control-plane CPU.
func TestHookSchedulingStormQuarantine(t *testing.T) {
now := time.Now().UTC().Format(time.RFC3339)
cfg := lifecycleConfig(t)
cfg.Startup.AutoQuarantineSchedulingStorms = true
cfg.Startup.SchedulingStormEventThreshold = 30
cfg.Startup.SchedulingStormWindowSeconds = 180
cfg.Startup.WorkloadConvergenceRequiredNamespaces = []string{"vault"}
cfg.Startup.IgnoreWorkloadNamespaces = []string{"ignored-ns"}
cfg.Startup.IgnoreWorkloads = []string{"monitoring/deployment/ignore-me"}
cfg.Startup.IgnoreUnavailableNodes = []string{"titan-22"}
scaledOllama := false
run := func(ctx context.Context, timeout time.Duration, name string, args ...string) (string, error) {
command := name + " " + strings.Join(args, " ")
switch {
case name == "kubectl" && strings.Contains(command, "get pods -A -o json"):
return `{"items":[
{"metadata":{"namespace":"ai","name":"ollama-pod","ownerReferences":[{"kind":"ReplicaSet","name":"ollama-rs"}]},"spec":{},"status":{"phase":"Pending"}},
{"metadata":{"namespace":"vault","name":"vault-0","ownerReferences":[{"kind":"StatefulSet","name":"vault"}]},"spec":{},"status":{"phase":"Pending"}},
{"metadata":{"namespace":"ignored-ns","name":"skip-pod","ownerReferences":[{"kind":"ReplicaSet","name":"skip-rs"}]},"spec":{},"status":{"phase":"Pending"}},
{"metadata":{"namespace":"monitoring","name":"ignore-me-pod","ownerReferences":[{"kind":"ReplicaSet","name":"ignore-me-rs"}]},"spec":{},"status":{"phase":"Pending"}},
{"metadata":{"namespace":"monitoring","name":"ignored-node-pod","ownerReferences":[{"kind":"ReplicaSet","name":"ignored-node-rs"}]},"spec":{"nodeName":"titan-22"},"status":{"phase":"Pending"}},
{"metadata":{"namespace":"monitoring","name":"running-pod","ownerReferences":[{"kind":"ReplicaSet","name":"running-rs"}]},"spec":{},"status":{"phase":"Running"}}
]}`, nil
case name == "kubectl" && strings.Contains(command, "get replicasets -A -o json"):
return `{"items":[
{"metadata":{"namespace":"ai","name":"ollama-rs","ownerReferences":[{"kind":"Deployment","name":"ollama"}]}},
{"metadata":{"namespace":"ignored-ns","name":"skip-rs","ownerReferences":[{"kind":"Deployment","name":"skip"}]}},
{"metadata":{"namespace":"monitoring","name":"ignore-me-rs","ownerReferences":[{"kind":"Deployment","name":"ignore-me"}]}},
{"metadata":{"namespace":"monitoring","name":"ignored-node-rs","ownerReferences":[{"kind":"Deployment","name":"ignored-node"}]}},
{"metadata":{"namespace":"monitoring","name":"running-rs","ownerReferences":[{"kind":"Deployment","name":"running"}]}}
]}`, nil
case name == "kubectl" && strings.Contains(command, "get events -A -o json"):
return `{"items":[
{"metadata":{"creationTimestamp":"` + now + `"},"involvedObject":{"kind":"Pod","namespace":"ai","name":"ollama-pod"},"type":"Warning","reason":"FailedScheduling","count":45},
{"metadata":{"creationTimestamp":"` + now + `"},"involvedObject":{"kind":"Pod","namespace":"vault","name":"vault-0"},"type":"Warning","reason":"FailedScheduling","count":45},
{"metadata":{"creationTimestamp":"` + now + `"},"involvedObject":{"kind":"Pod","namespace":"ignored-ns","name":"skip-pod"},"type":"Warning","reason":"FailedScheduling","count":45},
{"metadata":{"creationTimestamp":"` + now + `"},"involvedObject":{"kind":"Pod","namespace":"monitoring","name":"ignore-me-pod"},"type":"Warning","reason":"FailedScheduling","count":45},
{"metadata":{"creationTimestamp":"` + now + `"},"involvedObject":{"kind":"Pod","namespace":"monitoring","name":"ignored-node-pod"},"type":"Warning","reason":"FailedScheduling","count":45},
{"metadata":{"creationTimestamp":"` + now + `"},"involvedObject":{"kind":"Pod","namespace":"monitoring","name":"running-pod"},"type":"Warning","reason":"FailedScheduling","count":45},
{"metadata":{"creationTimestamp":"2000-01-01T00:00:00Z"},"involvedObject":{"kind":"Pod","namespace":"monitoring","name":"stale-pod"},"type":"Warning","reason":"FailedScheduling","count":99}
]}`, nil
case name == "kubectl" && strings.Contains(command, "get deploy,statefulset -A -o json"):
return `{"items":[
{"kind":"Deployment","metadata":{"namespace":"ai","name":"ollama"},"spec":{"replicas":1}},
{"kind":"StatefulSet","metadata":{"namespace":"vault","name":"vault"},"spec":{"replicas":1}},
{"kind":"Deployment","metadata":{"namespace":"ignored-ns","name":"skip"},"spec":{"replicas":1}},
{"kind":"Deployment","metadata":{"namespace":"monitoring","name":"ignore-me"},"spec":{"replicas":1}},
{"kind":"Deployment","metadata":{"namespace":"monitoring","name":"ignored-node"},"spec":{"replicas":1}},
{"kind":"Deployment","metadata":{"namespace":"monitoring","name":"running"},"spec":{"replicas":1}}
]}`, nil
case name == "kubectl" && strings.Contains(command, "-n ai scale deployment ollama --replicas=0"):
scaledOllama = true
return "", nil
default:
return lifecycleDispatcher(&commandRecorder{})(ctx, timeout, name, args...)
}
}
orch, _ := newHookOrchestrator(t, cfg, run, run)
orch.TestHookBeginStartupReport("scheduling-storm")
defer orch.TestHookFinalizeStartupReport(nil)
if err := orch.TestHookQuarantineSchedulingStormWorkloads(context.Background()); err != nil {
t.Fatalf("quarantine scheduling storm workloads: %v", err)
}
if !scaledOllama {
t.Fatalf("expected ollama deployment to be scaled to zero")
}
progress := readStartupProgress(t, orch)
if !strings.Contains(progress, "ollama") {
t.Fatalf("expected startup progress to mention ollama quarantine, payload=%s", progress)
}
if strings.Contains(progress, "vault") || strings.Contains(progress, "ignore-me") || strings.Contains(progress, "ignored-node") {
t.Fatalf("expected only the non-core eligible workload to be quarantined, payload=%s", progress)
}
}
// TestHookSchedulingStormTriggerGuards runs one orchestration or CLI step.
// Signature: TestHookSchedulingStormTriggerGuards(t *testing.T).
// Why: covers dry-run/disabled/rate-limit guards so the scheduler-storm auto-heal
// only activates when the cluster is actually suffering this exact failure mode.
func TestHookSchedulingStormTriggerGuards(t *testing.T) {
cfgDisabled := lifecycleConfig(t)
orchDisabled, _ := newHookOrchestrator(t, cfgDisabled, nil, nil)
lastAttempt := time.Time{}
orchDisabled.TestHookMaybeAutoQuarantineSchedulingStorms(context.Background(), &lastAttempt)
if !lastAttempt.IsZero() {
t.Fatalf("expected disabled scheduling-storm trigger to be skipped")
}
cfgDry := lifecycleConfig(t)
cfgDry.Startup.AutoQuarantineSchedulingStorms = true
orchDry := newDryRunHookOrchestrator(t, cfgDry, nil)
orchDry.TestHookMaybeAutoQuarantineSchedulingStorms(context.Background(), &lastAttempt)
if !lastAttempt.IsZero() {
t.Fatalf("expected dry-run scheduling-storm trigger to be skipped")
}
cfgRate := lifecycleConfig(t)
cfgRate.Startup.AutoQuarantineSchedulingStorms = true
cfgRate.Startup.SchedulingStormEventThreshold = 5
cfgRate.Startup.SchedulingStormWindowSeconds = 60
recorder := &commandRecorder{}
run := func(ctx context.Context, timeout time.Duration, name string, args ...string) (string, error) {
recorder.record(name, args)
command := name + " " + strings.Join(args, " ")
switch {
case name == "kubectl" && strings.Contains(command, "get pods -A -o json"):
return `{"items":[]}`, nil
case name == "kubectl" && strings.Contains(command, "get replicasets -A -o json"):
return `{"items":[]}`, nil
case name == "kubectl" && strings.Contains(command, "get events -A -o json"):
return `{"items":[]}`, nil
case name == "kubectl" && strings.Contains(command, "get deploy,statefulset -A -o json"):
return `{"items":[]}`, nil
default:
return lifecycleDispatcher(recorder)(ctx, timeout, name, args...)
}
}
orchRate, _ := newHookOrchestrator(t, cfgRate, run, run)
lastAttempt = time.Now()
orchRate.TestHookMaybeAutoQuarantineSchedulingStorms(context.Background(), &lastAttempt)
if recorder.contains("get pods -A -o json") {
t.Fatalf("expected rate-limited scheduling-storm trigger to skip kubectl scans")
}
}
// TestHookSchedulingStormTriggerAndNoOpBranches runs one orchestration or CLI step.
// Signature: TestHookSchedulingStormTriggerAndNoOpBranches(t *testing.T).
// Why: raises scheduling-storm branch coverage on the success/no-op paths so the
// auto-heal only acts on genuine event storms and stays quiet otherwise.
func TestHookSchedulingStormTriggerAndNoOpBranches(t *testing.T) {
cfg := lifecycleConfig(t)
cfg.Startup.AutoQuarantineSchedulingStorms = true
cfg.Startup.SchedulingStormEventThreshold = 0
cfg.Startup.SchedulingStormWindowSeconds = 0
scanRan := false
run := func(ctx context.Context, timeout time.Duration, name string, args ...string) (string, error) {
command := name + " " + strings.Join(args, " ")
switch {
case name == "kubectl" && strings.Contains(command, "get pods -A -o json"):
scanRan = true
return `{"items":[
{"metadata":{"namespace":"","name":"missing"}},
{"metadata":{"namespace":"monitoring","name":"no-owner"},"spec":{},"status":{"phase":"Pending"}},
{"metadata":{"namespace":"monitoring","name":"done","ownerReferences":[{"kind":"ReplicaSet","name":"done-rs"}]},"spec":{},"status":{"phase":"Running"}},
{"metadata":{"namespace":"monitoring","name":"zero-replicas","ownerReferences":[{"kind":"ReplicaSet","name":"zero-rs"}]},"spec":{},"status":{"phase":"Pending"}}
]}`, nil
case name == "kubectl" && strings.Contains(command, "get replicasets -A -o json"):
return `{"items":[
{"metadata":{"namespace":"","name":"bad-rs"}},
{"metadata":{"namespace":"monitoring","name":"done-rs","ownerReferences":[{"kind":"","name":"ignored"}]}},
{"metadata":{"namespace":"monitoring","name":"zero-rs","ownerReferences":[{"kind":"Deployment","name":"zero"}]}}
]}`, nil
case name == "kubectl" && strings.Contains(command, "get events -A -o json"):
return `{"items":[
{"metadata":{"creationTimestamp":"` + time.Now().UTC().Format(time.RFC3339) + `"},"involvedObject":{"kind":"Pod","namespace":"monitoring","name":"normal"},"type":"Normal","reason":"FailedScheduling","count":99},
{"metadata":{"creationTimestamp":"` + time.Now().UTC().Format(time.RFC3339) + `"},"involvedObject":{"kind":"Pod","namespace":"monitoring","name":"wrong-reason"},"type":"Warning","reason":"SomeOtherReason","count":99},
{"metadata":{"creationTimestamp":"` + time.Now().UTC().Format(time.RFC3339) + `"},"involvedObject":{"kind":"Service","namespace":"monitoring","name":"wrong-kind"},"type":"Warning","reason":"FailedScheduling","count":99},
{"metadata":{"creationTimestamp":"2000-01-01T00:00:00Z"},"involvedObject":{"kind":"Pod","namespace":"monitoring","name":"old"},"type":"Warning","reason":"FailedScheduling","count":99},
{"metadata":{"creationTimestamp":"` + time.Now().UTC().Format(time.RFC3339) + `"},"involvedObject":{"kind":"Pod","namespace":"monitoring","name":"low-count"},"type":"Warning","reason":"FailedScheduling","count":1},
{"metadata":{"creationTimestamp":"` + time.Now().UTC().Format(time.RFC3339) + `"},"involvedObject":{"kind":"Pod","namespace":"monitoring","name":"missing-pod"},"type":"Warning","reason":"FailedScheduling","count":99},
{"metadata":{"creationTimestamp":"` + time.Now().UTC().Format(time.RFC3339) + `"},"involvedObject":{"kind":"Pod","namespace":"monitoring","name":"done"},"type":"Warning","reason":"FailedScheduling","count":99},
{"metadata":{"creationTimestamp":"` + time.Now().UTC().Format(time.RFC3339) + `"},"involvedObject":{"kind":"Pod","namespace":"monitoring","name":"no-owner"},"type":"Warning","reason":"FailedScheduling","count":99},
{"metadata":{"creationTimestamp":"` + time.Now().UTC().Format(time.RFC3339) + `"},"involvedObject":{"kind":"Pod","namespace":"monitoring","name":"zero-replicas"},"type":"Warning","reason":"FailedScheduling","count":99}
]}`, nil
case name == "kubectl" && strings.Contains(command, "get deploy,statefulset -A -o json"):
return `{"items":[
{"kind":"","metadata":{"namespace":"monitoring","name":"blank-kind"}},
{"kind":"Job","metadata":{"namespace":"monitoring","name":"unsupported"}},
{"kind":"Deployment","metadata":{"namespace":"monitoring","name":"zero"},"spec":{"replicas":0}}
]}`, nil
default:
return lifecycleDispatcher(&commandRecorder{})(ctx, timeout, name, args...)
}
}
orch, _ := newHookOrchestrator(t, cfg, run, run)
orch.TestHookBeginStartupReport("scheduling-storm-noop")
defer orch.TestHookFinalizeStartupReport(nil)
lastAttempt := time.Time{}
orch.TestHookMaybeAutoQuarantineSchedulingStorms(context.Background(), &lastAttempt)
if lastAttempt.IsZero() {
t.Fatalf("expected successful scheduling-storm trigger to update lastAttempt")
}
if !scanRan {
t.Fatalf("expected scheduling-storm scan to execute")
}
progress := readStartupProgress(t, orch)
if strings.Contains(progress, "quarantined scheduling storm workload") {
t.Fatalf("expected no-op scheduling-storm scan to avoid auto-heal output, payload=%s", progress)
}
}
// TestHookSchedulingStormErrorMatrix runs one orchestration or CLI step.
// Signature: TestHookSchedulingStormErrorMatrix(t *testing.T).
// Why: covers malformed/error response branches in the scheduling-storm scan so
// Ananke can surface precise diagnostics when the API itself is part of the problem.
func TestHookSchedulingStormErrorMatrix(t *testing.T) {
cases := []struct {
name string
run func(context.Context, time.Duration, string, ...string) (string, error)
wantErr string
}{
{
name: "pods-query-error",
run: func(_ context.Context, _ time.Duration, name string, _ ...string) (string, error) {
if name == "kubectl" {
return "", errors.New("pods boom")
}
return "", nil
},
wantErr: "query pods for scheduling storm scan",
},
{
name: "pods-decode-error",
run: func(_ context.Context, _ time.Duration, name string, args ...string) (string, error) {
if name == "kubectl" && strings.Contains(strings.Join(args, " "), "get pods -A -o json") {
return "{", nil
}
return `{"items":[]}`, nil
},
wantErr: "decode pods for scheduling storm scan",
},
{
name: "replicasets-query-error",
run: func(_ context.Context, _ time.Duration, name string, args ...string) (string, error) {
command := name + " " + strings.Join(args, " ")
switch {
case name == "kubectl" && strings.Contains(command, "get pods -A -o json"):
return `{"items":[]}`, nil
case name == "kubectl" && strings.Contains(command, "get replicasets -A -o json"):
return "", errors.New("replicasets boom")
default:
return "", nil
}
},
wantErr: "query replicasets for scheduling storm scan",
},
{
name: "replicasets-decode-error",
run: func(_ context.Context, _ time.Duration, name string, args ...string) (string, error) {
command := name + " " + strings.Join(args, " ")
switch {
case name == "kubectl" && strings.Contains(command, "get pods -A -o json"):
return `{"items":[]}`, nil
case name == "kubectl" && strings.Contains(command, "get replicasets -A -o json"):
return "{", nil
default:
return `{"items":[]}`, nil
}
},
wantErr: "decode replicasets for scheduling storm scan",
},
{
name: "events-query-error",
run: func(_ context.Context, _ time.Duration, name string, args ...string) (string, error) {
command := name + " " + strings.Join(args, " ")
switch {
case name == "kubectl" && strings.Contains(command, "get pods -A -o json"):
return `{"items":[]}`, nil
case name == "kubectl" && strings.Contains(command, "get replicasets -A -o json"):
return `{"items":[]}`, nil
case name == "kubectl" && strings.Contains(command, "get events -A -o json"):
return "", errors.New("events boom")
default:
return "", nil
}
},
wantErr: "query events for scheduling storm scan",
},
{
name: "events-decode-error",
run: func(_ context.Context, _ time.Duration, name string, args ...string) (string, error) {
command := name + " " + strings.Join(args, " ")
switch {
case name == "kubectl" && strings.Contains(command, "get pods -A -o json"):
return `{"items":[]}`, nil
case name == "kubectl" && strings.Contains(command, "get replicasets -A -o json"):
return `{"items":[]}`, nil
case name == "kubectl" && strings.Contains(command, "get events -A -o json"):
return "{", nil
default:
return `{"items":[]}`, nil
}
},
wantErr: "decode events for scheduling storm scan",
},
{
name: "workloads-query-error",
run: func(_ context.Context, _ time.Duration, name string, args ...string) (string, error) {
command := name + " " + strings.Join(args, " ")
switch {
case name == "kubectl" && strings.Contains(command, "get pods -A -o json"):
return `{"items":[]}`, nil
case name == "kubectl" && strings.Contains(command, "get replicasets -A -o json"):
return `{"items":[]}`, nil
case name == "kubectl" && strings.Contains(command, "get events -A -o json"):
return `{"items":[]}`, nil
case name == "kubectl" && strings.Contains(command, "get deploy,statefulset -A -o json"):
return "", errors.New("workloads boom")
default:
return "", nil
}
},
wantErr: "query workloads for scheduling storm scan",
},
{
name: "workloads-decode-error",
run: func(_ context.Context, _ time.Duration, name string, args ...string) (string, error) {
command := name + " " + strings.Join(args, " ")
switch {
case name == "kubectl" && strings.Contains(command, "get pods -A -o json"):
return `{"items":[]}`, nil
case name == "kubectl" && strings.Contains(command, "get replicasets -A -o json"):
return `{"items":[]}`, nil
case name == "kubectl" && strings.Contains(command, "get events -A -o json"):
return `{"items":[]}`, nil
case name == "kubectl" && strings.Contains(command, "get deploy,statefulset -A -o json"):
return "{", nil
default:
return "", nil
}
},
wantErr: "decode workloads for scheduling storm scan",
},
}
for _, tc := range cases {
t.Run(tc.name, func(t *testing.T) {
cfg := lifecycleConfig(t)
cfg.Startup.AutoQuarantineSchedulingStorms = true
orch, _ := newHookOrchestrator(t, cfg, tc.run, tc.run)
err := orch.TestHookQuarantineSchedulingStormWorkloads(context.Background())
if err == nil || !strings.Contains(err.Error(), tc.wantErr) {
t.Fatalf("expected error containing %q, got %v", tc.wantErr, err)
}
})
}
}
// TestHookSchedulingStormScaleError runs one orchestration or CLI step.
// Signature: TestHookSchedulingStormScaleError(t *testing.T).
// Why: covers the final error path where Ananke detects a real storm but cannot
// scale the offending workload down.
func TestHookSchedulingStormScaleError(t *testing.T) {
now := time.Now().UTC().Format(time.RFC3339)
cfg := lifecycleConfig(t)
cfg.Startup.AutoQuarantineSchedulingStorms = true
cfg.Startup.SchedulingStormEventThreshold = 5
cfg.Startup.SchedulingStormWindowSeconds = 60
run := func(ctx context.Context, timeout time.Duration, name string, args ...string) (string, error) {
command := name + " " + strings.Join(args, " ")
switch {
case name == "kubectl" && strings.Contains(command, "get pods -A -o json"):
return `{"items":[{"metadata":{"namespace":"ai","name":"ollama-pod","ownerReferences":[{"kind":"ReplicaSet","name":"ollama-rs"}]},"spec":{},"status":{"phase":"Pending"}}]}`, nil
case name == "kubectl" && strings.Contains(command, "get replicasets -A -o json"):
return `{"items":[{"metadata":{"namespace":"ai","name":"ollama-rs","ownerReferences":[{"kind":"Deployment","name":"ollama"}]}}]}`, nil
case name == "kubectl" && strings.Contains(command, "get events -A -o json"):
return `{"items":[{"metadata":{"creationTimestamp":"` + now + `"},"involvedObject":{"kind":"Pod","namespace":"ai","name":"ollama-pod"},"type":"Warning","reason":"FailedScheduling","count":45}]}`, nil
case name == "kubectl" && strings.Contains(command, "get deploy,statefulset -A -o json"):
return `{"items":[{"kind":"Deployment","metadata":{"namespace":"ai","name":"ollama"},"spec":{"replicas":1}}]}`, nil
case name == "kubectl" && strings.Contains(command, "-n ai scale deployment ollama --replicas=0"):
return "", errors.New("scale denied")
default:
return lifecycleDispatcher(&commandRecorder{})(ctx, timeout, name, args...)
}
}
orch, _ := newHookOrchestrator(t, cfg, run, run)
err := orch.TestHookQuarantineSchedulingStormWorkloads(context.Background())
if err == nil || !strings.Contains(err.Error(), "scale scheduling storm workload ai/deployment/ollama to 0") {
t.Fatalf("expected scale error, got %v", err)
}
}