From d225291c5d8e000ab8f16353e2ffd96ca664570a Mon Sep 17 00:00:00 2001 From: codex Date: Tue, 5 May 2026 12:09:58 -0300 Subject: [PATCH] recovery(ananke): quarantine scheduling storm workloads --- configs/ananke.example.yaml | 3 + configs/ananke.tethys.yaml | 3 + configs/ananke.titan-db.yaml | 3 + .../cluster/orchestrator_scheduling_storm.go | 261 +++++++++++ .../cluster/orchestrator_storage_types.go | 41 ++ .../orchestrator_workload_convergence.go | 2 + .../cluster/testing_hooks_scheduling_storm.go | 88 ++++ internal/config/apply_defaults.go | 6 + internal/config/types.go | 3 + internal/config/validate.go | 8 + internal/config/validate_matrix_test.go | 8 + .../hooks_scheduling_storm_test.go | 432 ++++++++++++++++++ 12 files changed, 858 insertions(+) create mode 100644 internal/cluster/orchestrator_scheduling_storm.go create mode 100644 internal/cluster/testing_hooks_scheduling_storm.go create mode 100644 testing/orchestrator/hooks_scheduling_storm_test.go diff --git a/configs/ananke.example.yaml b/configs/ananke.example.yaml index 1472e48..5ea1867 100644 --- a/configs/ananke.example.yaml +++ b/configs/ananke.example.yaml @@ -150,6 +150,9 @@ startup: ignore_workloads: [] ignore_unavailable_nodes: [] auto_recycle_stuck_pods: true + auto_quarantine_scheduling_storms: false + scheduling_storm_event_threshold: 30 + scheduling_storm_window_seconds: 180 stuck_pod_grace_seconds: 180 vault_unseal_key_file: /var/lib/ananke/vault-unseal.key vault_unseal_breakglass_command: "" diff --git a/configs/ananke.tethys.yaml b/configs/ananke.tethys.yaml index 2c08b0e..2588d51 100644 --- a/configs/ananke.tethys.yaml +++ b/configs/ananke.tethys.yaml @@ -282,6 +282,9 @@ startup: ignore_workloads: [] ignore_unavailable_nodes: [] auto_recycle_stuck_pods: true + auto_quarantine_scheduling_storms: true + scheduling_storm_event_threshold: 30 + scheduling_storm_window_seconds: 180 stuck_pod_grace_seconds: 180 vault_unseal_key_file: /var/lib/ananke/vault-unseal.key vault_unseal_breakglass_command: "ssh -o BatchMode=yes -o StrictHostKeyChecking=accept-new -i /home/tethys/.ssh/id_ed25519 -p 1122 brad@99.183.132.163 'cat ~/.ananke-breakglass/vault-unseal.key'" diff --git a/configs/ananke.titan-db.yaml b/configs/ananke.titan-db.yaml index bd61fb6..0cab587 100644 --- a/configs/ananke.titan-db.yaml +++ b/configs/ananke.titan-db.yaml @@ -282,6 +282,9 @@ startup: ignore_workloads: [] ignore_unavailable_nodes: [] auto_recycle_stuck_pods: true + auto_quarantine_scheduling_storms: true + scheduling_storm_event_threshold: 30 + scheduling_storm_window_seconds: 180 stuck_pod_grace_seconds: 180 vault_unseal_key_file: /var/lib/ananke/vault-unseal.key vault_unseal_breakglass_command: "ssh -o BatchMode=yes -o StrictHostKeyChecking=accept-new -i /home/atlas/.ssh/id_ed25519 -p 1122 brad@99.183.132.163 'cat ~/.ananke-breakglass/vault-unseal.key'" diff --git a/internal/cluster/orchestrator_scheduling_storm.go b/internal/cluster/orchestrator_scheduling_storm.go new file mode 100644 index 0000000..dbf50ef --- /dev/null +++ b/internal/cluster/orchestrator_scheduling_storm.go @@ -0,0 +1,261 @@ +package cluster + +import ( + "context" + "encoding/json" + "fmt" + "sort" + "strings" + "time" +) + +// maybeAutoQuarantineSchedulingStorms runs one orchestration or CLI step. +// Signature: (o *Orchestrator) maybeAutoQuarantineSchedulingStorms(ctx context.Context, lastAttempt *time.Time). +// Why: a non-core workload that cannot schedule can emit enough warning events to +// thrash the control plane datastore; quarantine keeps startup moving while +// preserving core services. +func (o *Orchestrator) maybeAutoQuarantineSchedulingStorms(ctx context.Context, lastAttempt *time.Time) { + if o.runner.DryRun || !o.cfg.Startup.AutoQuarantineSchedulingStorms { + return + } + now := time.Now() + if lastAttempt != nil && !lastAttempt.IsZero() && now.Sub(*lastAttempt) < 30*time.Second { + return + } + if lastAttempt != nil { + *lastAttempt = now + } + o.bestEffort("quarantine non-core scheduling storm workloads", func() error { + return o.quarantineSchedulingStormWorkloads(ctx) + }) +} + +// quarantineSchedulingStormWorkloads runs one orchestration or CLI step. +// Signature: (o *Orchestrator) quarantineSchedulingStormWorkloads(ctx context.Context) error. +// Why: limits startup-only mitigation to workloads proven to be generating a +// scheduling event storm, instead of scaling optional apps down blindly. +func (o *Orchestrator) quarantineSchedulingStormWorkloads(ctx context.Context) error { + podsOut, err := o.kubectl(ctx, 30*time.Second, "get", "pods", "-A", "-o", "json") + if err != nil { + return fmt.Errorf("query pods for scheduling storm scan: %w", err) + } + var pods podList + if err := json.Unmarshal([]byte(podsOut), &pods); err != nil { + return fmt.Errorf("decode pods for scheduling storm scan: %w", err) + } + + rsOut, err := o.kubectl(ctx, 30*time.Second, "get", "replicasets", "-A", "-o", "json") + if err != nil { + return fmt.Errorf("query replicasets for scheduling storm scan: %w", err) + } + var rsList replicaSetList + if err := json.Unmarshal([]byte(rsOut), &rsList); err != nil { + return fmt.Errorf("decode replicasets for scheduling storm scan: %w", err) + } + + eventsOut, err := o.kubectl(ctx, 30*time.Second, "get", "events", "-A", "-o", "json") + if err != nil { + return fmt.Errorf("query events for scheduling storm scan: %w", err) + } + var events eventList + if err := json.Unmarshal([]byte(eventsOut), &events); err != nil { + return fmt.Errorf("decode events for scheduling storm scan: %w", err) + } + + workloadsOut, err := o.kubectl(ctx, 30*time.Second, "get", "deploy,statefulset", "-A", "-o", "json") + if err != nil { + return fmt.Errorf("query workloads for scheduling storm scan: %w", err) + } + var workloads workloadList + if err := json.Unmarshal([]byte(workloadsOut), &workloads); err != nil { + return fmt.Errorf("decode workloads for scheduling storm scan: %w", err) + } + + requiredNamespaces := o.startupRequiredWorkloadNamespaces() + ignoredNamespaces := makeStringSet(o.cfg.Startup.IgnoreWorkloadNamespaces) + ignoredNodes := makeStringSet(o.cfg.Startup.IgnoreUnavailableNodes) + ignoreRules := parseWorkloadIgnoreRules(o.cfg.Startup.IgnoreWorkloads) + eventThreshold := o.cfg.Startup.SchedulingStormEventThreshold + if eventThreshold <= 0 { + eventThreshold = 30 + } + window := time.Duration(o.cfg.Startup.SchedulingStormWindowSeconds) * time.Second + if window <= 0 { + window = 3 * time.Minute + } + + podsByKey := map[string]podResource{} + for _, pod := range pods.Items { + ns := strings.TrimSpace(pod.Metadata.Namespace) + name := strings.TrimSpace(pod.Metadata.Name) + if ns == "" || name == "" { + continue + } + podsByKey[ns+"/"+name] = pod + } + + rsOwners := map[string]ownerReference{} + for _, rs := range rsList.Items { + ns := strings.TrimSpace(rs.Metadata.Namespace) + name := strings.TrimSpace(rs.Metadata.Name) + if ns == "" || name == "" { + continue + } + for _, owner := range rs.Metadata.OwnerReferences { + kind := strings.TrimSpace(owner.Kind) + ownerName := strings.TrimSpace(owner.Name) + if kind == "" || ownerName == "" { + continue + } + rsOwners[ns+"/"+name] = owner + break + } + } + + workloadDesired := map[string]int32{} + for _, item := range workloads.Items { + kind := strings.ToLower(strings.TrimSpace(item.Kind)) + ns := strings.TrimSpace(item.Metadata.Namespace) + name := strings.TrimSpace(item.Metadata.Name) + if kind == "" || ns == "" || name == "" { + continue + } + desired, _, ok := desiredReady(item) + if !ok { + continue + } + workloadDesired[ns+"/"+kind+"/"+name] = desired + } + + quarantined := []string{} + seen := map[string]struct{}{} + now := time.Now() + for _, event := range events.Items { + if !strings.EqualFold(strings.TrimSpace(event.Type), "Warning") { + continue + } + if strings.TrimSpace(event.Reason) != "FailedScheduling" { + continue + } + if !strings.EqualFold(strings.TrimSpace(event.InvolvedObject.Kind), "Pod") { + continue + } + lastSeen := eventLastObservedAt(event) + if !lastSeen.IsZero() && now.Sub(lastSeen) > window { + continue + } + count := eventObservationCount(event) + if count < eventThreshold { + continue + } + podKey := strings.TrimSpace(event.InvolvedObject.Namespace) + "/" + strings.TrimSpace(event.InvolvedObject.Name) + pod, ok := podsByKey[podKey] + if !ok { + continue + } + if !strings.EqualFold(strings.TrimSpace(pod.Status.Phase), "Pending") { + continue + } + ns := strings.TrimSpace(pod.Metadata.Namespace) + if _, ok := requiredNamespaces[ns]; ok { + continue + } + if _, ok := ignoredNamespaces[ns]; ok { + continue + } + if workloadIgnored(ignoreRules, ns, "", strings.TrimSpace(pod.Metadata.Name)) { + continue + } + if podTargetsIgnoredNode(pod, ignoredNodes) { + continue + } + workload, ok := schedulingStormOwnerWorkload(pod, rsOwners) + if !ok { + continue + } + if workloadIgnored(ignoreRules, workload.Namespace, workload.Kind, workload.Name) { + continue + } + workloadKey := workload.Namespace + "/" + workload.Kind + "/" + workload.Name + if _, done := seen[workloadKey]; done { + continue + } + desired := workloadDesired[workloadKey] + if desired <= 0 { + continue + } + if err := o.ensureWorkloadReplicas(ctx, workload, 0); err != nil { + return fmt.Errorf("scale scheduling storm workload %s to 0: %w", workloadKey, err) + } + seen[workloadKey] = struct{}{} + quarantined = append(quarantined, fmt.Sprintf("%s events=%d window=%s", workloadKey, count, window)) + } + + if len(quarantined) == 0 { + return nil + } + sort.Strings(quarantined) + detail := fmt.Sprintf("quarantined scheduling storm workload(s): %s", joinLimited(quarantined, 8)) + o.log.Printf("%s", detail) + o.noteStartupAutoHeal(detail) + return nil +} + +// schedulingStormOwnerWorkload runs one orchestration or CLI step. +// Signature: schedulingStormOwnerWorkload(pod podResource, rsOwners map[string]ownerReference) (startupWorkload, bool). +// Why: scheduling storms happen at the pod layer, but safe mitigation needs to +// operate on the owning deployment or statefulset. +func schedulingStormOwnerWorkload(pod podResource, rsOwners map[string]ownerReference) (startupWorkload, bool) { + ns := strings.TrimSpace(pod.Metadata.Namespace) + for _, owner := range pod.Metadata.OwnerReferences { + switch strings.TrimSpace(owner.Kind) { + case "StatefulSet": + if name := strings.TrimSpace(owner.Name); name != "" { + return startupWorkload{Namespace: ns, Kind: "statefulset", Name: name}, true + } + case "ReplicaSet": + rsName := strings.TrimSpace(owner.Name) + if rsName == "" { + continue + } + rsOwner, ok := rsOwners[ns+"/"+rsName] + if !ok || strings.TrimSpace(rsOwner.Kind) != "Deployment" || strings.TrimSpace(rsOwner.Name) == "" { + continue + } + return startupWorkload{Namespace: ns, Kind: "deployment", Name: strings.TrimSpace(rsOwner.Name)}, true + } + } + return startupWorkload{}, false +} + +// eventObservationCount runs one orchestration or CLI step. +// Signature: eventObservationCount(event eventResource) int. +// Why: event count can live either on the root event or in the series payload; +// using the max keeps detection stable across Kubernetes versions. +func eventObservationCount(event eventResource) int { + count := event.Count + if event.Series.Count > count { + count = event.Series.Count + } + if count < 1 { + return 1 + } + return count +} + +// eventLastObservedAt runs one orchestration or CLI step. +// Signature: eventLastObservedAt(event eventResource) time.Time. +// Why: event recency fields vary by cluster version; prefer the newest explicit +// observation time and fall back to creation time when needed. +func eventLastObservedAt(event eventResource) time.Time { + switch { + case !event.Series.LastObservedTime.IsZero(): + return event.Series.LastObservedTime + case !event.LastTimestamp.IsZero(): + return event.LastTimestamp + case !event.EventTime.IsZero(): + return event.EventTime + default: + return event.Metadata.CreationTimestamp + } +} diff --git a/internal/cluster/orchestrator_storage_types.go b/internal/cluster/orchestrator_storage_types.go index 4760f1e..4fca02b 100644 --- a/internal/cluster/orchestrator_storage_types.go +++ b/internal/cluster/orchestrator_storage_types.go @@ -177,6 +177,46 @@ type jobConditionRef struct { Status string `json:"status"` } +type eventList struct { + Items []eventResource `json:"items"` +} + +type eventResource struct { + Metadata struct { + Namespace string `json:"namespace"` + CreationTimestamp time.Time `json:"creationTimestamp"` + } `json:"metadata"` + InvolvedObject struct { + Kind string `json:"kind"` + Namespace string `json:"namespace"` + Name string `json:"name"` + } `json:"involvedObject"` + Type string `json:"type"` + Reason string `json:"reason"` + Message string `json:"message"` + Count int `json:"count"` + EventTime time.Time `json:"eventTime"` + LastTimestamp time.Time `json:"lastTimestamp"` + Series eventSeries `json:"series"` +} + +type eventSeries struct { + Count int `json:"count"` + LastObservedTime time.Time `json:"lastObservedTime"` +} + +type replicaSetList struct { + Items []replicaSetResource `json:"items"` +} + +type replicaSetResource struct { + Metadata struct { + Namespace string `json:"namespace"` + Name string `json:"name"` + OwnerReferences []ownerReference `json:"ownerReferences"` + } `json:"metadata"` +} + type workloadResource struct { Kind string `json:"kind"` Metadata struct { @@ -221,6 +261,7 @@ type podResource struct { type ownerReference struct { Kind string `json:"kind"` + Name string `json:"name"` } type podContainerStatus struct { diff --git a/internal/cluster/orchestrator_workload_convergence.go b/internal/cluster/orchestrator_workload_convergence.go index 179b0ca..31f648c 100644 --- a/internal/cluster/orchestrator_workload_convergence.go +++ b/internal/cluster/orchestrator_workload_convergence.go @@ -26,10 +26,12 @@ func (o *Orchestrator) waitForWorkloadConvergence(ctx context.Context) error { lastLogged := time.Time{} lastRecycleAttempt := time.Time{} lastReplicaHeal := time.Time{} + lastSchedulingStormHeal := time.Time{} for { prevFailure := lastFailure o.maybeAutoRecycleStuckPods(ctx, &lastRecycleAttempt) o.maybeAutoHealCriticalWorkloadReplicas(ctx, &lastReplicaHeal) + o.maybeAutoQuarantineSchedulingStorms(ctx, &lastSchedulingStormHeal) ready, detail, err := o.workloadConvergenceReady(ctx) if err != nil { lastFailure = err.Error() diff --git a/internal/cluster/testing_hooks_scheduling_storm.go b/internal/cluster/testing_hooks_scheduling_storm.go new file mode 100644 index 0000000..aeaadd7 --- /dev/null +++ b/internal/cluster/testing_hooks_scheduling_storm.go @@ -0,0 +1,88 @@ +package cluster + +import ( + "context" + "fmt" + "strings" + "time" +) + +// TestHookMaybeAutoQuarantineSchedulingStorms runs one orchestration or CLI step. +// Signature: (o *Orchestrator) TestHookMaybeAutoQuarantineSchedulingStorms(ctx context.Context, lastAttempt *time.Time). +// Why: exposes the scheduling-storm trigger guard to the split top-level test module. +func (o *Orchestrator) TestHookMaybeAutoQuarantineSchedulingStorms(ctx context.Context, lastAttempt *time.Time) { + o.maybeAutoQuarantineSchedulingStorms(ctx, lastAttempt) +} + +// TestHookQuarantineSchedulingStormWorkloads runs one orchestration or CLI step. +// Signature: (o *Orchestrator) TestHookQuarantineSchedulingStormWorkloads(ctx context.Context) error. +// Why: exposes the scheduling-storm auto-heal body to the split top-level test module. +func (o *Orchestrator) TestHookQuarantineSchedulingStormWorkloads(ctx context.Context) error { + return o.quarantineSchedulingStormWorkloads(ctx) +} + +// TestHookSchedulingStormOwnerWorkload runs one orchestration or CLI step. +// Signature: TestHookSchedulingStormOwnerWorkload(namespace string, ownerKind string, ownerName string, rsOwnerKind string, rsOwnerName string) (string, bool). +// Why: exposes owner-resolution behavior without leaking internal workload types. +func TestHookSchedulingStormOwnerWorkload( + namespace string, + ownerKind string, + ownerName string, + rsOwnerKind string, + rsOwnerName string, +) (string, bool) { + var pod podResource + pod.Metadata.Namespace = strings.TrimSpace(namespace) + pod.Metadata.OwnerReferences = []ownerReference{{ + Kind: strings.TrimSpace(ownerKind), + Name: strings.TrimSpace(ownerName), + }} + rsOwners := map[string]ownerReference{} + if rsName := strings.TrimSpace(ownerName); rsName != "" && strings.TrimSpace(ownerKind) == "ReplicaSet" { + rsOwners[pod.Metadata.Namespace+"/"+rsName] = ownerReference{ + Kind: strings.TrimSpace(rsOwnerKind), + Name: strings.TrimSpace(rsOwnerName), + } + } + workload, ok := schedulingStormOwnerWorkload(pod, rsOwners) + if !ok { + return "", false + } + return fmt.Sprintf("%s/%s/%s", workload.Namespace, workload.Kind, workload.Name), true +} + +// TestHookEventObservationCount runs one orchestration or CLI step. +// Signature: TestHookEventObservationCount(count int, seriesCount int) int. +// Why: exposes event-count normalization used by scheduling-storm detection. +func TestHookEventObservationCount(count int, seriesCount int) int { + return eventObservationCount(eventResource{ + Count: count, + Series: eventSeries{ + Count: seriesCount, + }, + }) +} + +// TestHookEventLastObservedAt runs one orchestration or CLI step. +// Signature: TestHookEventLastObservedAt(seriesLastObserved time.Time, lastTimestamp time.Time, eventTime time.Time, creationTimestamp time.Time) time.Time. +// Why: exposes event-time fallback behavior used by scheduling-storm detection. +func TestHookEventLastObservedAt( + seriesLastObserved time.Time, + lastTimestamp time.Time, + eventTime time.Time, + creationTimestamp time.Time, +) time.Time { + return eventLastObservedAt(eventResource{ + LastTimestamp: lastTimestamp, + EventTime: eventTime, + Series: eventSeries{ + LastObservedTime: seriesLastObserved, + }, + Metadata: struct { + Namespace string `json:"namespace"` + CreationTimestamp time.Time `json:"creationTimestamp"` + }{ + CreationTimestamp: creationTimestamp, + }, + }) +} diff --git a/internal/config/apply_defaults.go b/internal/config/apply_defaults.go index 7363fb6..58700d4 100644 --- a/internal/config/apply_defaults.go +++ b/internal/config/apply_defaults.go @@ -237,6 +237,12 @@ func (c *Config) applyDefaults() { if c.UPS.TelemetryTimeoutSeconds <= 0 { c.UPS.TelemetryTimeoutSeconds = 90 } + if c.Startup.SchedulingStormEventThreshold <= 0 { + c.Startup.SchedulingStormEventThreshold = 30 + } + if c.Startup.SchedulingStormWindowSeconds <= 0 { + c.Startup.SchedulingStormWindowSeconds = 180 + } if c.Coordination.ForwardShutdownConfig == "" { c.Coordination.ForwardShutdownConfig = "/etc/ananke/ananke.yaml" } diff --git a/internal/config/types.go b/internal/config/types.go index cd8eea1..46bbf0e 100644 --- a/internal/config/types.go +++ b/internal/config/types.go @@ -87,6 +87,9 @@ type Startup struct { IgnoreWorkloads []string `yaml:"ignore_workloads"` IgnoreUnavailableNodes []string `yaml:"ignore_unavailable_nodes"` AutoRecycleStuckPods bool `yaml:"auto_recycle_stuck_pods"` + AutoQuarantineSchedulingStorms bool `yaml:"auto_quarantine_scheduling_storms"` + SchedulingStormEventThreshold int `yaml:"scheduling_storm_event_threshold"` + SchedulingStormWindowSeconds int `yaml:"scheduling_storm_window_seconds"` StuckPodGraceSeconds int `yaml:"stuck_pod_grace_seconds"` VaultUnsealKeyFile string `yaml:"vault_unseal_key_file"` VaultUnsealBreakglassCommand string `yaml:"vault_unseal_breakglass_command"` diff --git a/internal/config/validate.go b/internal/config/validate.go index 1db2690..84f97a9 100644 --- a/internal/config/validate.go +++ b/internal/config/validate.go @@ -343,6 +343,14 @@ func (c Config) Validate() error { return fmt.Errorf("config.coordination.forward_shutdown_config must not be empty when forward_shutdown_host is set") } } + if c.Startup.AutoQuarantineSchedulingStorms { + if c.Startup.SchedulingStormEventThreshold <= 0 { + return fmt.Errorf("config.startup.scheduling_storm_event_threshold must be > 0 when auto_quarantine_scheduling_storms is enabled") + } + if c.Startup.SchedulingStormWindowSeconds <= 0 { + return fmt.Errorf("config.startup.scheduling_storm_window_seconds must be > 0 when auto_quarantine_scheduling_storms is enabled") + } + } for _, peer := range c.Coordination.PeerHosts { if strings.TrimSpace(peer) == "" { return fmt.Errorf("config.coordination.peer_hosts entries must not be empty") diff --git a/internal/config/validate_matrix_test.go b/internal/config/validate_matrix_test.go index ff01346..3d61e3f 100644 --- a/internal/config/validate_matrix_test.go +++ b/internal/config/validate_matrix_test.go @@ -92,6 +92,14 @@ func TestValidateRejectsInvalidFieldsMatrix(t *testing.T) { }}, {"bad_empty_ignore_unavailable_nodes_entry", func(c *Config) { c.Startup.IgnoreUnavailableNodes = []string{"", "titan-22"} }}, {"bad_empty_vault_key_file", func(c *Config) { c.Startup.VaultUnsealKeyFile = "" }}, + {"bad_scheduling_storm_threshold", func(c *Config) { + c.Startup.AutoQuarantineSchedulingStorms = true + c.Startup.SchedulingStormEventThreshold = 0 + }}, + {"bad_scheduling_storm_window", func(c *Config) { + c.Startup.AutoQuarantineSchedulingStorms = true + c.Startup.SchedulingStormWindowSeconds = 0 + }}, {"bad_ssh_port_low", func(c *Config) { c.SSHPort = 0 }}, {"bad_ups_provider", func(c *Config) { c.UPS.Enabled = true; c.UPS.Provider = "" }}, {"bad_ups_on_battery_grace_negative", func(c *Config) { c.UPS.Enabled = true; c.UPS.OnBatteryGraceSeconds = -1 }}, diff --git a/testing/orchestrator/hooks_scheduling_storm_test.go b/testing/orchestrator/hooks_scheduling_storm_test.go new file mode 100644 index 0000000..89515ff --- /dev/null +++ b/testing/orchestrator/hooks_scheduling_storm_test.go @@ -0,0 +1,432 @@ +package orchestrator + +import ( + "context" + "errors" + "strings" + "testing" + "time" + + "scm.bstein.dev/bstein/ananke/internal/cluster" +) + +// TestHookSchedulingStormHelpers runs one orchestration or CLI step. +// Signature: TestHookSchedulingStormHelpers(t *testing.T). +// Why: keeps scheduling-storm helper coverage in the split top-level testing module +// required by the repo hygiene contract. +func TestHookSchedulingStormHelpers(t *testing.T) { + if got, ok := cluster.TestHookSchedulingStormOwnerWorkload("ai", "ReplicaSet", "ollama-rs", "Deployment", "ollama"); !ok || got != "ai/deployment/ollama" { + t.Fatalf("unexpected deployment owner resolution: got=%q ok=%v", got, ok) + } + if got, ok := cluster.TestHookSchedulingStormOwnerWorkload("storage", "StatefulSet", "nextcloud", "", ""); !ok || got != "storage/statefulset/nextcloud" { + t.Fatalf("unexpected statefulset owner resolution: got=%q ok=%v", got, ok) + } + if got, ok := cluster.TestHookSchedulingStormOwnerWorkload("ai", "ReplicaSet", "missing", "", ""); ok || got != "" { + t.Fatalf("expected missing replicaset owner lookup to fail, got=%q ok=%v", got, ok) + } + + if got := cluster.TestHookEventObservationCount(3, 9); got != 9 { + t.Fatalf("expected series count to win, got %d", got) + } + if got := cluster.TestHookEventObservationCount(0, 0); got != 1 { + t.Fatalf("expected zero-count normalization to 1, got %d", got) + } + + now := time.Now().UTC().Round(time.Second) + if got := cluster.TestHookEventLastObservedAt(now, now.Add(-time.Minute), now.Add(-2*time.Minute), now.Add(-3*time.Minute)); !got.Equal(now) { + t.Fatalf("expected series timestamp priority, got %s", got) + } + if got := cluster.TestHookEventLastObservedAt(time.Time{}, now, now.Add(-time.Minute), now.Add(-2*time.Minute)); !got.Equal(now) { + t.Fatalf("expected lastTimestamp fallback, got %s", got) + } + if got := cluster.TestHookEventLastObservedAt(time.Time{}, time.Time{}, now, now.Add(-time.Minute)); !got.Equal(now) { + t.Fatalf("expected eventTime fallback, got %s", got) + } + if got := cluster.TestHookEventLastObservedAt(time.Time{}, time.Time{}, time.Time{}, now); !got.Equal(now) { + t.Fatalf("expected creationTimestamp fallback, got %s", got) + } +} + +// TestHookSchedulingStormQuarantine runs one orchestration or CLI step. +// Signature: TestHookSchedulingStormQuarantine(t *testing.T). +// Why: verifies that only non-core workloads generating real scheduling storms +// are auto-quarantined, which prevents event/Kine churn from spiking control-plane CPU. +func TestHookSchedulingStormQuarantine(t *testing.T) { + now := time.Now().UTC().Format(time.RFC3339) + cfg := lifecycleConfig(t) + cfg.Startup.AutoQuarantineSchedulingStorms = true + cfg.Startup.SchedulingStormEventThreshold = 30 + cfg.Startup.SchedulingStormWindowSeconds = 180 + cfg.Startup.WorkloadConvergenceRequiredNamespaces = []string{"vault"} + cfg.Startup.IgnoreWorkloadNamespaces = []string{"ignored-ns"} + cfg.Startup.IgnoreWorkloads = []string{"monitoring/deployment/ignore-me"} + cfg.Startup.IgnoreUnavailableNodes = []string{"titan-22"} + scaledOllama := false + + run := func(ctx context.Context, timeout time.Duration, name string, args ...string) (string, error) { + command := name + " " + strings.Join(args, " ") + switch { + case name == "kubectl" && strings.Contains(command, "get pods -A -o json"): + return `{"items":[ + {"metadata":{"namespace":"ai","name":"ollama-pod","ownerReferences":[{"kind":"ReplicaSet","name":"ollama-rs"}]},"spec":{},"status":{"phase":"Pending"}}, + {"metadata":{"namespace":"vault","name":"vault-0","ownerReferences":[{"kind":"StatefulSet","name":"vault"}]},"spec":{},"status":{"phase":"Pending"}}, + {"metadata":{"namespace":"ignored-ns","name":"skip-pod","ownerReferences":[{"kind":"ReplicaSet","name":"skip-rs"}]},"spec":{},"status":{"phase":"Pending"}}, + {"metadata":{"namespace":"monitoring","name":"ignore-me-pod","ownerReferences":[{"kind":"ReplicaSet","name":"ignore-me-rs"}]},"spec":{},"status":{"phase":"Pending"}}, + {"metadata":{"namespace":"monitoring","name":"ignored-node-pod","ownerReferences":[{"kind":"ReplicaSet","name":"ignored-node-rs"}]},"spec":{"nodeName":"titan-22"},"status":{"phase":"Pending"}}, + {"metadata":{"namespace":"monitoring","name":"running-pod","ownerReferences":[{"kind":"ReplicaSet","name":"running-rs"}]},"spec":{},"status":{"phase":"Running"}} + ]}`, nil + case name == "kubectl" && strings.Contains(command, "get replicasets -A -o json"): + return `{"items":[ + {"metadata":{"namespace":"ai","name":"ollama-rs","ownerReferences":[{"kind":"Deployment","name":"ollama"}]}}, + {"metadata":{"namespace":"ignored-ns","name":"skip-rs","ownerReferences":[{"kind":"Deployment","name":"skip"}]}}, + {"metadata":{"namespace":"monitoring","name":"ignore-me-rs","ownerReferences":[{"kind":"Deployment","name":"ignore-me"}]}}, + {"metadata":{"namespace":"monitoring","name":"ignored-node-rs","ownerReferences":[{"kind":"Deployment","name":"ignored-node"}]}}, + {"metadata":{"namespace":"monitoring","name":"running-rs","ownerReferences":[{"kind":"Deployment","name":"running"}]}} + ]}`, nil + case name == "kubectl" && strings.Contains(command, "get events -A -o json"): + return `{"items":[ + {"metadata":{"creationTimestamp":"` + now + `"},"involvedObject":{"kind":"Pod","namespace":"ai","name":"ollama-pod"},"type":"Warning","reason":"FailedScheduling","count":45}, + {"metadata":{"creationTimestamp":"` + now + `"},"involvedObject":{"kind":"Pod","namespace":"vault","name":"vault-0"},"type":"Warning","reason":"FailedScheduling","count":45}, + {"metadata":{"creationTimestamp":"` + now + `"},"involvedObject":{"kind":"Pod","namespace":"ignored-ns","name":"skip-pod"},"type":"Warning","reason":"FailedScheduling","count":45}, + {"metadata":{"creationTimestamp":"` + now + `"},"involvedObject":{"kind":"Pod","namespace":"monitoring","name":"ignore-me-pod"},"type":"Warning","reason":"FailedScheduling","count":45}, + {"metadata":{"creationTimestamp":"` + now + `"},"involvedObject":{"kind":"Pod","namespace":"monitoring","name":"ignored-node-pod"},"type":"Warning","reason":"FailedScheduling","count":45}, + {"metadata":{"creationTimestamp":"` + now + `"},"involvedObject":{"kind":"Pod","namespace":"monitoring","name":"running-pod"},"type":"Warning","reason":"FailedScheduling","count":45}, + {"metadata":{"creationTimestamp":"2000-01-01T00:00:00Z"},"involvedObject":{"kind":"Pod","namespace":"monitoring","name":"stale-pod"},"type":"Warning","reason":"FailedScheduling","count":99} + ]}`, nil + case name == "kubectl" && strings.Contains(command, "get deploy,statefulset -A -o json"): + return `{"items":[ + {"kind":"Deployment","metadata":{"namespace":"ai","name":"ollama"},"spec":{"replicas":1}}, + {"kind":"StatefulSet","metadata":{"namespace":"vault","name":"vault"},"spec":{"replicas":1}}, + {"kind":"Deployment","metadata":{"namespace":"ignored-ns","name":"skip"},"spec":{"replicas":1}}, + {"kind":"Deployment","metadata":{"namespace":"monitoring","name":"ignore-me"},"spec":{"replicas":1}}, + {"kind":"Deployment","metadata":{"namespace":"monitoring","name":"ignored-node"},"spec":{"replicas":1}}, + {"kind":"Deployment","metadata":{"namespace":"monitoring","name":"running"},"spec":{"replicas":1}} + ]}`, nil + case name == "kubectl" && strings.Contains(command, "-n ai scale deployment ollama --replicas=0"): + scaledOllama = true + return "", nil + default: + return lifecycleDispatcher(&commandRecorder{})(ctx, timeout, name, args...) + } + } + + orch, _ := newHookOrchestrator(t, cfg, run, run) + orch.TestHookBeginStartupReport("scheduling-storm") + defer orch.TestHookFinalizeStartupReport(nil) + + if err := orch.TestHookQuarantineSchedulingStormWorkloads(context.Background()); err != nil { + t.Fatalf("quarantine scheduling storm workloads: %v", err) + } + if !scaledOllama { + t.Fatalf("expected ollama deployment to be scaled to zero") + } + progress := readStartupProgress(t, orch) + if !strings.Contains(progress, "ollama") { + t.Fatalf("expected startup progress to mention ollama quarantine, payload=%s", progress) + } + if strings.Contains(progress, "vault") || strings.Contains(progress, "ignore-me") || strings.Contains(progress, "ignored-node") { + t.Fatalf("expected only the non-core eligible workload to be quarantined, payload=%s", progress) + } +} + +// TestHookSchedulingStormTriggerGuards runs one orchestration or CLI step. +// Signature: TestHookSchedulingStormTriggerGuards(t *testing.T). +// Why: covers dry-run/disabled/rate-limit guards so the scheduler-storm auto-heal +// only activates when the cluster is actually suffering this exact failure mode. +func TestHookSchedulingStormTriggerGuards(t *testing.T) { + cfgDisabled := lifecycleConfig(t) + orchDisabled, _ := newHookOrchestrator(t, cfgDisabled, nil, nil) + lastAttempt := time.Time{} + orchDisabled.TestHookMaybeAutoQuarantineSchedulingStorms(context.Background(), &lastAttempt) + if !lastAttempt.IsZero() { + t.Fatalf("expected disabled scheduling-storm trigger to be skipped") + } + + cfgDry := lifecycleConfig(t) + cfgDry.Startup.AutoQuarantineSchedulingStorms = true + orchDry := newDryRunHookOrchestrator(t, cfgDry, nil) + orchDry.TestHookMaybeAutoQuarantineSchedulingStorms(context.Background(), &lastAttempt) + if !lastAttempt.IsZero() { + t.Fatalf("expected dry-run scheduling-storm trigger to be skipped") + } + + cfgRate := lifecycleConfig(t) + cfgRate.Startup.AutoQuarantineSchedulingStorms = true + cfgRate.Startup.SchedulingStormEventThreshold = 5 + cfgRate.Startup.SchedulingStormWindowSeconds = 60 + recorder := &commandRecorder{} + run := func(ctx context.Context, timeout time.Duration, name string, args ...string) (string, error) { + recorder.record(name, args) + command := name + " " + strings.Join(args, " ") + switch { + case name == "kubectl" && strings.Contains(command, "get pods -A -o json"): + return `{"items":[]}`, nil + case name == "kubectl" && strings.Contains(command, "get replicasets -A -o json"): + return `{"items":[]}`, nil + case name == "kubectl" && strings.Contains(command, "get events -A -o json"): + return `{"items":[]}`, nil + case name == "kubectl" && strings.Contains(command, "get deploy,statefulset -A -o json"): + return `{"items":[]}`, nil + default: + return lifecycleDispatcher(recorder)(ctx, timeout, name, args...) + } + } + orchRate, _ := newHookOrchestrator(t, cfgRate, run, run) + lastAttempt = time.Now() + orchRate.TestHookMaybeAutoQuarantineSchedulingStorms(context.Background(), &lastAttempt) + if recorder.contains("get pods -A -o json") { + t.Fatalf("expected rate-limited scheduling-storm trigger to skip kubectl scans") + } +} + +// TestHookSchedulingStormTriggerAndNoOpBranches runs one orchestration or CLI step. +// Signature: TestHookSchedulingStormTriggerAndNoOpBranches(t *testing.T). +// Why: raises scheduling-storm branch coverage on the success/no-op paths so the +// auto-heal only acts on genuine event storms and stays quiet otherwise. +func TestHookSchedulingStormTriggerAndNoOpBranches(t *testing.T) { + cfg := lifecycleConfig(t) + cfg.Startup.AutoQuarantineSchedulingStorms = true + cfg.Startup.SchedulingStormEventThreshold = 0 + cfg.Startup.SchedulingStormWindowSeconds = 0 + + scanRan := false + run := func(ctx context.Context, timeout time.Duration, name string, args ...string) (string, error) { + command := name + " " + strings.Join(args, " ") + switch { + case name == "kubectl" && strings.Contains(command, "get pods -A -o json"): + scanRan = true + return `{"items":[ + {"metadata":{"namespace":"","name":"missing"}}, + {"metadata":{"namespace":"monitoring","name":"no-owner"},"spec":{},"status":{"phase":"Pending"}}, + {"metadata":{"namespace":"monitoring","name":"done","ownerReferences":[{"kind":"ReplicaSet","name":"done-rs"}]},"spec":{},"status":{"phase":"Running"}}, + {"metadata":{"namespace":"monitoring","name":"zero-replicas","ownerReferences":[{"kind":"ReplicaSet","name":"zero-rs"}]},"spec":{},"status":{"phase":"Pending"}} + ]}`, nil + case name == "kubectl" && strings.Contains(command, "get replicasets -A -o json"): + return `{"items":[ + {"metadata":{"namespace":"","name":"bad-rs"}}, + {"metadata":{"namespace":"monitoring","name":"done-rs","ownerReferences":[{"kind":"","name":"ignored"}]}}, + {"metadata":{"namespace":"monitoring","name":"zero-rs","ownerReferences":[{"kind":"Deployment","name":"zero"}]}} + ]}`, nil + case name == "kubectl" && strings.Contains(command, "get events -A -o json"): + return `{"items":[ + {"metadata":{"creationTimestamp":"` + time.Now().UTC().Format(time.RFC3339) + `"},"involvedObject":{"kind":"Pod","namespace":"monitoring","name":"normal"},"type":"Normal","reason":"FailedScheduling","count":99}, + {"metadata":{"creationTimestamp":"` + time.Now().UTC().Format(time.RFC3339) + `"},"involvedObject":{"kind":"Pod","namespace":"monitoring","name":"wrong-reason"},"type":"Warning","reason":"SomeOtherReason","count":99}, + {"metadata":{"creationTimestamp":"` + time.Now().UTC().Format(time.RFC3339) + `"},"involvedObject":{"kind":"Service","namespace":"monitoring","name":"wrong-kind"},"type":"Warning","reason":"FailedScheduling","count":99}, + {"metadata":{"creationTimestamp":"2000-01-01T00:00:00Z"},"involvedObject":{"kind":"Pod","namespace":"monitoring","name":"old"},"type":"Warning","reason":"FailedScheduling","count":99}, + {"metadata":{"creationTimestamp":"` + time.Now().UTC().Format(time.RFC3339) + `"},"involvedObject":{"kind":"Pod","namespace":"monitoring","name":"low-count"},"type":"Warning","reason":"FailedScheduling","count":1}, + {"metadata":{"creationTimestamp":"` + time.Now().UTC().Format(time.RFC3339) + `"},"involvedObject":{"kind":"Pod","namespace":"monitoring","name":"missing-pod"},"type":"Warning","reason":"FailedScheduling","count":99}, + {"metadata":{"creationTimestamp":"` + time.Now().UTC().Format(time.RFC3339) + `"},"involvedObject":{"kind":"Pod","namespace":"monitoring","name":"done"},"type":"Warning","reason":"FailedScheduling","count":99}, + {"metadata":{"creationTimestamp":"` + time.Now().UTC().Format(time.RFC3339) + `"},"involvedObject":{"kind":"Pod","namespace":"monitoring","name":"no-owner"},"type":"Warning","reason":"FailedScheduling","count":99}, + {"metadata":{"creationTimestamp":"` + time.Now().UTC().Format(time.RFC3339) + `"},"involvedObject":{"kind":"Pod","namespace":"monitoring","name":"zero-replicas"},"type":"Warning","reason":"FailedScheduling","count":99} + ]}`, nil + case name == "kubectl" && strings.Contains(command, "get deploy,statefulset -A -o json"): + return `{"items":[ + {"kind":"","metadata":{"namespace":"monitoring","name":"blank-kind"}}, + {"kind":"Job","metadata":{"namespace":"monitoring","name":"unsupported"}}, + {"kind":"Deployment","metadata":{"namespace":"monitoring","name":"zero"},"spec":{"replicas":0}} + ]}`, nil + default: + return lifecycleDispatcher(&commandRecorder{})(ctx, timeout, name, args...) + } + } + + orch, _ := newHookOrchestrator(t, cfg, run, run) + orch.TestHookBeginStartupReport("scheduling-storm-noop") + defer orch.TestHookFinalizeStartupReport(nil) + + lastAttempt := time.Time{} + orch.TestHookMaybeAutoQuarantineSchedulingStorms(context.Background(), &lastAttempt) + if lastAttempt.IsZero() { + t.Fatalf("expected successful scheduling-storm trigger to update lastAttempt") + } + if !scanRan { + t.Fatalf("expected scheduling-storm scan to execute") + } + progress := readStartupProgress(t, orch) + if strings.Contains(progress, "quarantined scheduling storm workload") { + t.Fatalf("expected no-op scheduling-storm scan to avoid auto-heal output, payload=%s", progress) + } +} + +// TestHookSchedulingStormErrorMatrix runs one orchestration or CLI step. +// Signature: TestHookSchedulingStormErrorMatrix(t *testing.T). +// Why: covers malformed/error response branches in the scheduling-storm scan so +// Ananke can surface precise diagnostics when the API itself is part of the problem. +func TestHookSchedulingStormErrorMatrix(t *testing.T) { + cases := []struct { + name string + run func(context.Context, time.Duration, string, ...string) (string, error) + wantErr string + }{ + { + name: "pods-query-error", + run: func(_ context.Context, _ time.Duration, name string, _ ...string) (string, error) { + if name == "kubectl" { + return "", errors.New("pods boom") + } + return "", nil + }, + wantErr: "query pods for scheduling storm scan", + }, + { + name: "pods-decode-error", + run: func(_ context.Context, _ time.Duration, name string, args ...string) (string, error) { + if name == "kubectl" && strings.Contains(strings.Join(args, " "), "get pods -A -o json") { + return "{", nil + } + return `{"items":[]}`, nil + }, + wantErr: "decode pods for scheduling storm scan", + }, + { + name: "replicasets-query-error", + run: func(_ context.Context, _ time.Duration, name string, args ...string) (string, error) { + command := name + " " + strings.Join(args, " ") + switch { + case name == "kubectl" && strings.Contains(command, "get pods -A -o json"): + return `{"items":[]}`, nil + case name == "kubectl" && strings.Contains(command, "get replicasets -A -o json"): + return "", errors.New("replicasets boom") + default: + return "", nil + } + }, + wantErr: "query replicasets for scheduling storm scan", + }, + { + name: "replicasets-decode-error", + run: func(_ context.Context, _ time.Duration, name string, args ...string) (string, error) { + command := name + " " + strings.Join(args, " ") + switch { + case name == "kubectl" && strings.Contains(command, "get pods -A -o json"): + return `{"items":[]}`, nil + case name == "kubectl" && strings.Contains(command, "get replicasets -A -o json"): + return "{", nil + default: + return `{"items":[]}`, nil + } + }, + wantErr: "decode replicasets for scheduling storm scan", + }, + { + name: "events-query-error", + run: func(_ context.Context, _ time.Duration, name string, args ...string) (string, error) { + command := name + " " + strings.Join(args, " ") + switch { + case name == "kubectl" && strings.Contains(command, "get pods -A -o json"): + return `{"items":[]}`, nil + case name == "kubectl" && strings.Contains(command, "get replicasets -A -o json"): + return `{"items":[]}`, nil + case name == "kubectl" && strings.Contains(command, "get events -A -o json"): + return "", errors.New("events boom") + default: + return "", nil + } + }, + wantErr: "query events for scheduling storm scan", + }, + { + name: "events-decode-error", + run: func(_ context.Context, _ time.Duration, name string, args ...string) (string, error) { + command := name + " " + strings.Join(args, " ") + switch { + case name == "kubectl" && strings.Contains(command, "get pods -A -o json"): + return `{"items":[]}`, nil + case name == "kubectl" && strings.Contains(command, "get replicasets -A -o json"): + return `{"items":[]}`, nil + case name == "kubectl" && strings.Contains(command, "get events -A -o json"): + return "{", nil + default: + return `{"items":[]}`, nil + } + }, + wantErr: "decode events for scheduling storm scan", + }, + { + name: "workloads-query-error", + run: func(_ context.Context, _ time.Duration, name string, args ...string) (string, error) { + command := name + " " + strings.Join(args, " ") + switch { + case name == "kubectl" && strings.Contains(command, "get pods -A -o json"): + return `{"items":[]}`, nil + case name == "kubectl" && strings.Contains(command, "get replicasets -A -o json"): + return `{"items":[]}`, nil + case name == "kubectl" && strings.Contains(command, "get events -A -o json"): + return `{"items":[]}`, nil + case name == "kubectl" && strings.Contains(command, "get deploy,statefulset -A -o json"): + return "", errors.New("workloads boom") + default: + return "", nil + } + }, + wantErr: "query workloads for scheduling storm scan", + }, + { + name: "workloads-decode-error", + run: func(_ context.Context, _ time.Duration, name string, args ...string) (string, error) { + command := name + " " + strings.Join(args, " ") + switch { + case name == "kubectl" && strings.Contains(command, "get pods -A -o json"): + return `{"items":[]}`, nil + case name == "kubectl" && strings.Contains(command, "get replicasets -A -o json"): + return `{"items":[]}`, nil + case name == "kubectl" && strings.Contains(command, "get events -A -o json"): + return `{"items":[]}`, nil + case name == "kubectl" && strings.Contains(command, "get deploy,statefulset -A -o json"): + return "{", nil + default: + return "", nil + } + }, + wantErr: "decode workloads for scheduling storm scan", + }, + } + + for _, tc := range cases { + t.Run(tc.name, func(t *testing.T) { + cfg := lifecycleConfig(t) + cfg.Startup.AutoQuarantineSchedulingStorms = true + orch, _ := newHookOrchestrator(t, cfg, tc.run, tc.run) + err := orch.TestHookQuarantineSchedulingStormWorkloads(context.Background()) + if err == nil || !strings.Contains(err.Error(), tc.wantErr) { + t.Fatalf("expected error containing %q, got %v", tc.wantErr, err) + } + }) + } +} + +// TestHookSchedulingStormScaleError runs one orchestration or CLI step. +// Signature: TestHookSchedulingStormScaleError(t *testing.T). +// Why: covers the final error path where Ananke detects a real storm but cannot +// scale the offending workload down. +func TestHookSchedulingStormScaleError(t *testing.T) { + now := time.Now().UTC().Format(time.RFC3339) + cfg := lifecycleConfig(t) + cfg.Startup.AutoQuarantineSchedulingStorms = true + cfg.Startup.SchedulingStormEventThreshold = 5 + cfg.Startup.SchedulingStormWindowSeconds = 60 + + run := func(ctx context.Context, timeout time.Duration, name string, args ...string) (string, error) { + command := name + " " + strings.Join(args, " ") + switch { + case name == "kubectl" && strings.Contains(command, "get pods -A -o json"): + return `{"items":[{"metadata":{"namespace":"ai","name":"ollama-pod","ownerReferences":[{"kind":"ReplicaSet","name":"ollama-rs"}]},"spec":{},"status":{"phase":"Pending"}}]}`, nil + case name == "kubectl" && strings.Contains(command, "get replicasets -A -o json"): + return `{"items":[{"metadata":{"namespace":"ai","name":"ollama-rs","ownerReferences":[{"kind":"Deployment","name":"ollama"}]}}]}`, nil + case name == "kubectl" && strings.Contains(command, "get events -A -o json"): + return `{"items":[{"metadata":{"creationTimestamp":"` + now + `"},"involvedObject":{"kind":"Pod","namespace":"ai","name":"ollama-pod"},"type":"Warning","reason":"FailedScheduling","count":45}]}`, nil + case name == "kubectl" && strings.Contains(command, "get deploy,statefulset -A -o json"): + return `{"items":[{"kind":"Deployment","metadata":{"namespace":"ai","name":"ollama"},"spec":{"replicas":1}}]}`, nil + case name == "kubectl" && strings.Contains(command, "-n ai scale deployment ollama --replicas=0"): + return "", errors.New("scale denied") + default: + return lifecycleDispatcher(&commandRecorder{})(ctx, timeout, name, args...) + } + } + + orch, _ := newHookOrchestrator(t, cfg, run, run) + err := orch.TestHookQuarantineSchedulingStormWorkloads(context.Background()) + if err == nil || !strings.Contains(err.Error(), "scale scheduling storm workload ai/deployment/ollama to 0") { + t.Fatalf("expected scale error, got %v", err) + } +}