From 85c0741b3eacec4fc320d5a84fc379a6a2039fa0 Mon Sep 17 00:00:00 2001 From: codex Date: Fri, 19 Jun 2026 15:43:44 -0300 Subject: [PATCH] recovery: expire automatic node cordons --- configs/ananke.example.yaml | 1 + configs/ananke.tethys.yaml | 1 + configs/ananke.titan-db.yaml | 1 + internal/cluster/orchestrator_autorepair.go | 49 +- internal/cluster/orchestrator_cordon_lease.go | 339 +++++++++++ .../cluster/orchestrator_cordon_lease_test.go | 143 +++++ ...rator_critical_endpoint_additional_test.go | 99 ++++ .../cluster/orchestrator_critical_vault.go | 41 -- internal/cluster/orchestrator_lifecycle.go | 72 --- .../cluster/orchestrator_longhorn_recovery.go | 254 +++++++++ internal/cluster/orchestrator_pod_recovery.go | 139 +++++ internal/cluster/orchestrator_shutdown.go | 82 +++ .../orchestrator_unit_additional_test.go | 530 ------------------ .../orchestrator_workload_convergence.go | 380 +------------ .../orchestrator_workload_recovery_test.go | 458 +++++++++++++++ .../cluster/orchestrator_workload_status.go | 49 ++ internal/config/apply_defaults.go | 3 + internal/config/defaults.go | 1 + internal/config/types.go | 1 + internal/config/validate.go | 3 + testing/hygiene/in_tree_test_allowlist.txt | 3 + 21 files changed, 1610 insertions(+), 1039 deletions(-) create mode 100644 internal/cluster/orchestrator_cordon_lease.go create mode 100644 internal/cluster/orchestrator_cordon_lease_test.go create mode 100644 internal/cluster/orchestrator_critical_endpoint_additional_test.go create mode 100644 internal/cluster/orchestrator_longhorn_recovery.go create mode 100644 internal/cluster/orchestrator_pod_recovery.go create mode 100644 internal/cluster/orchestrator_shutdown.go create mode 100644 internal/cluster/orchestrator_workload_recovery_test.go create mode 100644 internal/cluster/orchestrator_workload_status.go diff --git a/configs/ananke.example.yaml b/configs/ananke.example.yaml index 1ce7c00..059b27d 100644 --- a/configs/ananke.example.yaml +++ b/configs/ananke.example.yaml @@ -156,6 +156,7 @@ startup: scheduling_storm_window_seconds: 180 stuck_pod_grace_seconds: 180 post_start_auto_heal_seconds: 60 + recovery_cordon_max_seconds: 3600 dead_node_cleanup_grace_seconds: 300 vault_unseal_key_file: /var/lib/ananke/vault-unseal.key vault_unseal_breakglass_command: "" diff --git a/configs/ananke.tethys.yaml b/configs/ananke.tethys.yaml index 068e0e4..2fc00fc 100644 --- a/configs/ananke.tethys.yaml +++ b/configs/ananke.tethys.yaml @@ -291,6 +291,7 @@ startup: scheduling_storm_window_seconds: 180 stuck_pod_grace_seconds: 180 post_start_auto_heal_seconds: 60 + recovery_cordon_max_seconds: 3600 dead_node_cleanup_grace_seconds: 300 vault_unseal_key_file: /var/lib/ananke/vault-unseal.key vault_unseal_breakglass_command: "ssh -o BatchMode=yes -o StrictHostKeyChecking=accept-new -i /home/tethys/.ssh/id_ed25519 -p 1122 brad@99.183.132.163 'cat ~/.ananke-breakglass/vault-unseal.key'" diff --git a/configs/ananke.titan-db.yaml b/configs/ananke.titan-db.yaml index 64954bc..02972c9 100644 --- a/configs/ananke.titan-db.yaml +++ b/configs/ananke.titan-db.yaml @@ -291,6 +291,7 @@ startup: scheduling_storm_window_seconds: 180 stuck_pod_grace_seconds: 180 post_start_auto_heal_seconds: 60 + recovery_cordon_max_seconds: 3600 dead_node_cleanup_grace_seconds: 300 vault_unseal_key_file: /var/lib/ananke/vault-unseal.key vault_unseal_breakglass_command: "ssh -o BatchMode=yes -o StrictHostKeyChecking=accept-new -i /home/atlas/.ssh/id_ed25519 -p 1122 brad@99.183.132.163 'cat ~/.ananke-breakglass/vault-unseal.key'" diff --git a/internal/cluster/orchestrator_autorepair.go b/internal/cluster/orchestrator_autorepair.go index 8f392f3..3c9cb87 100644 --- a/internal/cluster/orchestrator_autorepair.go +++ b/internal/cluster/orchestrator_autorepair.go @@ -10,20 +10,27 @@ import ( ) type nodeReadyList struct { - Items []struct { - Metadata struct { - Name string `json:"name"` - } `json:"metadata"` - Spec struct { - Unschedulable bool `json:"unschedulable"` - } `json:"spec"` - Status struct { - Conditions []struct { - Type string `json:"type"` - Status string `json:"status"` - } `json:"conditions"` - } `json:"status"` - } `json:"items"` + Items []nodeReadyItem `json:"items"` +} + +type nodeReadyItem struct { + Metadata struct { + Name string `json:"name"` + Annotations map[string]string `json:"annotations"` + } `json:"metadata"` + Spec struct { + Unschedulable bool `json:"unschedulable"` + Taints []struct { + Key string `json:"key"` + TimeAdded time.Time `json:"timeAdded"` + } `json:"taints"` + } `json:"spec"` + Status struct { + Conditions []struct { + Type string `json:"type"` + Status string `json:"status"` + } `json:"conditions"` + } `json:"status"` } type readyNodeCandidate struct { @@ -68,6 +75,13 @@ func (o *Orchestrator) postStartAutoHeal(ctx context.Context) error { errs = append(errs, fmt.Sprintf("required node labels: %v", err)) } + releasedCordons, err := o.enforceRecoveryCordonLeases(ctx) + if err != nil { + errs = append(errs, fmt.Sprintf("recovery cordon lease check: %v", err)) + } else if releasedCordons > 0 { + requestReconcile = true + } + vaultRecovered, err := o.autoRecoverSealedVault(ctx) if err != nil { errs = append(errs, fmt.Sprintf("vault auto-recovery: %v", err)) @@ -252,7 +266,7 @@ func (o *Orchestrator) repairBrokenKubeletProxies(ctx context.Context) (int, err } if !node.Unschedulable { - if _, err := o.kubectl(ctx, 20*time.Second, "cordon", node.Name); err != nil { + if err := o.cordonNodeWithLease(ctx, node.Name, cordonReasonKubeletProxy, "broken kubelet proxy before k3s-agent restart"); err != nil { errs = append(errs, fmt.Sprintf("%s cordon before kubelet restart: %v", node.Name, err)) continue } @@ -262,8 +276,7 @@ func (o *Orchestrator) repairBrokenKubeletProxies(ctx context.Context) (int, err if _, err := o.sshWithTimeout(ctx, node.Name, "sudo -n systemctl restart k3s-agent", 90*time.Second); err != nil { if !node.Unschedulable { o.bestEffort("uncordon node after failed kubelet proxy repair", func() error { - _, uncordonErr := o.kubectl(ctx, 20*time.Second, "uncordon", node.Name) - return uncordonErr + return o.uncordonAndClearCordonLease(ctx, node.Name, cordonReasonKubeletProxy) }) } errs = append(errs, fmt.Sprintf("%s restart k3s-agent: %v", node.Name, err)) @@ -279,7 +292,7 @@ func (o *Orchestrator) repairBrokenKubeletProxies(ctx context.Context) (int, err continue } if !node.Unschedulable { - if _, err := o.kubectl(ctx, 20*time.Second, "uncordon", node.Name); err != nil { + if err := o.uncordonAndClearCordonLease(ctx, node.Name, cordonReasonKubeletProxy); err != nil { errs = append(errs, fmt.Sprintf("%s uncordon after kubelet proxy repair: %v", node.Name, err)) continue } diff --git a/internal/cluster/orchestrator_cordon_lease.go b/internal/cluster/orchestrator_cordon_lease.go new file mode 100644 index 0000000..f01afab --- /dev/null +++ b/internal/cluster/orchestrator_cordon_lease.go @@ -0,0 +1,339 @@ +package cluster + +import ( + "context" + "encoding/json" + "errors" + "fmt" + "strings" + "time" +) + +const ( + anankeCordonOwnerAnnotation = "ananke.bstein.dev/cordon-owner" + anankeCordonReasonAnnotation = "ananke.bstein.dev/cordon-reason" + anankeCordonDetailAnnotation = "ananke.bstein.dev/cordon-detail" + anankeCordonCreatedAnnotation = "ananke.bstein.dev/cordon-created-at" + anankeCordonDeadlineAnnotation = "ananke.bstein.dev/cordon-deadline" + anankeCordonManualActionAnnotation = "ananke.bstein.dev/manual-action" + anankeCordonManualAtAnnotation = "ananke.bstein.dev/manual-action-at" +) + +const ( + cordonReasonMissingCryptsetup = "missing-cryptsetup" + cordonReasonRuntimeWedge = "container-runtime-wedge" + cordonReasonKubeletProxy = "kubelet-proxy-repair" +) + +// cordonNodeWithLease cordons a node and records an expiry-bound owner/reason. +// Signature: (o *Orchestrator) cordonNodeWithLease(ctx context.Context, node, reason, detail string) error. +// Why: recovery cordons must be accountable leases so a node cannot be stranded +// indefinitely after an automatic repair attempt. +func (o *Orchestrator) cordonNodeWithLease(ctx context.Context, node string, reason string, detail string) error { + node = strings.TrimSpace(node) + reason = strings.TrimSpace(reason) + detail = sanitizeCordonAnnotationValue(detail) + if node == "" { + return fmt.Errorf("node must not be empty") + } + if reason == "" { + reason = "recovery" + } + now := time.Now().UTC() + deadline := now.Add(o.recoveryCordonMaxDuration()) + if err := o.annotateCordonLease(ctx, node, reason, detail, now, deadline); err != nil { + return err + } + if _, err := o.kubectl(ctx, 30*time.Second, "cordon", node); err != nil { + o.bestEffort("clear cordon lease after cordon failure", func() error { + return o.clearCordonLease(ctx, node) + }) + return err + } + o.log.Printf("cordoned node %s with ananke lease reason=%s deadline=%s", node, reason, deadline.Format(time.RFC3339)) + return nil +} + +// annotateCordonLease records why Ananke is cordoning a node and when it expires. +// Signature: (o *Orchestrator) annotateCordonLease(ctx context.Context, node, reason, detail string, created, deadline time.Time) error. +// Why: the annotations are the durable handoff between startup recovery and the +// daemon's post-start reconciliation loop. +func (o *Orchestrator) annotateCordonLease(ctx context.Context, node string, reason string, detail string, created time.Time, deadline time.Time) error { + args := []string{ + "annotate", "node", node, + anankeCordonOwnerAnnotation + "=ananke", + anankeCordonReasonAnnotation + "=" + reason, + anankeCordonDetailAnnotation + "=" + detail, + anankeCordonCreatedAnnotation + "=" + created.Format(time.RFC3339), + anankeCordonDeadlineAnnotation + "=" + deadline.Format(time.RFC3339), + "--overwrite", + } + if _, err := o.kubectl(ctx, 20*time.Second, args...); err != nil { + return fmt.Errorf("annotate cordon lease on %s: %w", node, err) + } + return nil +} + +// clearCordonLease removes Ananke's cordon ownership markers from a node. +// Signature: (o *Orchestrator) clearCordonLease(ctx context.Context, node string) error. +// Why: once a node is safe to schedule again, stale lease annotations should not +// keep paging operators or confusing later recovery loops. +func (o *Orchestrator) clearCordonLease(ctx context.Context, node string) error { + args := []string{ + "annotate", "node", node, + anankeCordonOwnerAnnotation + "-", + anankeCordonReasonAnnotation + "-", + anankeCordonDetailAnnotation + "-", + anankeCordonCreatedAnnotation + "-", + anankeCordonDeadlineAnnotation + "-", + anankeCordonManualActionAnnotation + "-", + anankeCordonManualAtAnnotation + "-", + "--overwrite", + } + if _, err := o.kubectl(ctx, 20*time.Second, args...); err != nil && !isNotFoundErr(err) { + return fmt.Errorf("clear cordon lease on %s: %w", node, err) + } + return nil +} + +// uncordonAndClearCordonLease returns a recovered node to service. +// Signature: (o *Orchestrator) uncordonAndClearCordonLease(ctx context.Context, node, reason string) error. +// Why: the uncordon and annotation cleanup should happen as one logical recovery +// action so leased cordons do not linger after the hazard is gone. +func (o *Orchestrator) uncordonAndClearCordonLease(ctx context.Context, node string, reason string) error { + if _, err := o.kubectl(ctx, 30*time.Second, "uncordon", node); err != nil { + return fmt.Errorf("uncordon %s after %s recovery: %w", node, reason, err) + } + if err := o.clearCordonLease(ctx, node); err != nil { + o.log.Printf("warning: node %s uncordoned but lease cleanup failed: %v", node, err) + } + o.log.Printf("uncordoned node %s after recovering leased cordon reason=%s", node, reason) + o.noteStartupAutoHeal(fmt.Sprintf("uncordoned %s after recovering %s", node, reason)) + return nil +} + +// enforceRecoveryCordonLeases reconciles Ananke-created cordons and reports stale +// manual cordons. +// Signature: (o *Orchestrator) enforceRecoveryCordonLeases(ctx context.Context) (int, error). +// Why: automatic recovery can temporarily cordon a node, but it must either clear +// that cordon or tell the operator before the resource is stranded. +func (o *Orchestrator) enforceRecoveryCordonLeases(ctx context.Context) (int, error) { + nodes, err := o.queryReadyNodes(ctx) + if err != nil { + return 0, err + } + maxAge := o.recoveryCordonMaxDuration() + now := time.Now().UTC() + released := 0 + errs := []string{} + for _, node := range nodes.Items { + name := strings.TrimSpace(node.Metadata.Name) + if name == "" { + continue + } + ann := node.Metadata.Annotations + if !node.Spec.Unschedulable { + if ann[anankeCordonOwnerAnnotation] == "ananke" { + if err := o.clearCordonLease(ctx, name); err != nil { + errs = append(errs, err.Error()) + } + } + continue + } + if ann[anankeCordonOwnerAnnotation] != "ananke" { + cordonTime := nodeUnschedulableSince(node) + if !cordonTime.IsZero() && now.Sub(cordonTime) > maxAge { + message := fmt.Sprintf("manual action required: node %s has an unowned cordon older than %s; Ananke will not silently leave resources stranded or override a non-Ananke cordon", name, maxAge) + o.markManualActionRequired(ctx, name, message) + errs = append(errs, message) + } + continue + } + + recovered, recoverErr := o.recoverLeasedCordon(ctx, name, ann) + if recovered { + released++ + continue + } + if recoverErr == nil { + continue + } + expired, deadlineText := cordonLeaseExpired(ann, now) + if expired { + message := fmt.Sprintf("manual action required: node %s remains cordoned after Ananke repair lease expired at %s: %v", name, deadlineText, recoverErr) + o.markManualActionRequired(ctx, name, message) + errs = append(errs, message) + } else { + o.log.Printf("leased cordon on %s still pending repair before %s: %v", name, deadlineText, recoverErr) + } + } + if len(errs) > 0 { + return released, errors.New(strings.Join(errs, "; ")) + } + return released, nil +} + +// recoverLeasedCordon tries the reason-specific repair for an Ananke cordon. +// Signature: (o *Orchestrator) recoverLeasedCordon(ctx context.Context, node string, annotations map[string]string) (bool, error). +// Why: each cordon reason has a different safety gate; a node only comes back when +// the original hazard is known to be gone. +func (o *Orchestrator) recoverLeasedCordon(ctx context.Context, node string, annotations map[string]string) (bool, error) { + reason := strings.TrimSpace(annotations[anankeCordonReasonAnnotation]) + switch reason { + case cordonReasonMissingCryptsetup: + if err := o.ensureHostCryptsetup(ctx, node); err != nil { + return false, fmt.Errorf("cryptsetup repair still failing: %w", err) + } + if err := o.uncordonAndClearCordonLease(ctx, node, reason); err != nil { + return false, err + } + return true, nil + case cordonReasonRuntimeWedge: + stillWedged, err := o.nodeStillHasRuntimeWedge(ctx, node) + if err != nil { + return false, err + } + if stillWedged { + return false, fmt.Errorf("container runtime wedge evidence is still present") + } + if err := o.uncordonAndClearCordonLease(ctx, node, reason); err != nil { + return false, err + } + return true, nil + case cordonReasonKubeletProxy: + healthy, err := o.kubeletProxyHealthy(ctx, node) + if !healthy { + return false, fmt.Errorf("kubelet proxy still unhealthy: %v", err) + } + if err := o.uncordonAndClearCordonLease(ctx, node, reason); err != nil { + return false, err + } + return true, nil + default: + return false, fmt.Errorf("unknown Ananke cordon reason %q", reason) + } +} + +// nodeStillHasRuntimeWedge checks whether the original runtime-wedge symptom remains. +// Signature: (o *Orchestrator) nodeStillHasRuntimeWedge(ctx context.Context, node string) (bool, error). +// Why: a runtime-wedge cordon can be released once the cluster no longer shows +// repeated controller-owned, non-PVC pod start failures on that node. +func (o *Orchestrator) nodeStillHasRuntimeWedge(ctx context.Context, node string) (bool, error) { + out, err := o.kubectl(ctx, 30*time.Second, "get", "pods", "-A", "-o", "json") + if err != nil { + return false, fmt.Errorf("query pods for runtime-wedge recovery: %w", err) + } + var list podList + if err := json.Unmarshal([]byte(out), &list); err != nil { + return false, fmt.Errorf("decode pods for runtime-wedge recovery: %w", err) + } + grace := time.Duration(o.cfg.Startup.StuckPodGraceSeconds) * time.Second + if grace <= 0 { + grace = 180 * time.Second + } + reasons, err := o.containerRuntimeWedgePodReasons(ctx, list, grace) + if err != nil { + return false, err + } + for _, pod := range list.Items { + ns := strings.TrimSpace(pod.Metadata.Namespace) + name := strings.TrimSpace(pod.Metadata.Name) + if ns == "" || name == "" || strings.TrimSpace(pod.Spec.NodeName) != node { + continue + } + if reasons[ns+"/"+name] == "" { + continue + } + if podUsesPersistentVolumeClaim(pod) || !podControllerOwned(pod) { + continue + } + return true, nil + } + return false, nil +} + +// queryReadyNodes fetches nodes using the shared light-weight node shape. +// Signature: (o *Orchestrator) queryReadyNodes(ctx context.Context) (nodeReadyList, error). +// Why: lease checks need annotations and taint timestamps in addition to Ready state. +func (o *Orchestrator) queryReadyNodes(ctx context.Context) (nodeReadyList, error) { + out, err := o.kubectl(ctx, 20*time.Second, "get", "nodes", "-o", "json") + if err != nil { + return nodeReadyList{}, fmt.Errorf("query nodes: %w", err) + } + var nodes nodeReadyList + if err := json.Unmarshal([]byte(out), &nodes); err != nil { + return nodeReadyList{}, fmt.Errorf("decode nodes: %w", err) + } + return nodes, nil +} + +// markManualActionRequired makes a stale cordon visible on the node object itself. +// Signature: (o *Orchestrator) markManualActionRequired(ctx context.Context, node, message string). +// Why: daemon logs are easy to miss during recovery; the node should carry the +// operator-facing action that prevented automatic release. +func (o *Orchestrator) markManualActionRequired(ctx context.Context, node string, message string) { + now := time.Now().UTC().Format(time.RFC3339) + message = sanitizeCordonAnnotationValue(message) + if _, err := o.kubectl( + ctx, + 20*time.Second, + "annotate", "node", node, + anankeCordonManualActionAnnotation+"="+message, + anankeCordonManualAtAnnotation+"="+now, + "--overwrite", + ); err != nil { + o.log.Printf("warning: mark manual action required on %s failed: %v", node, err) + } +} + +// recoveryCordonMaxDuration returns the maximum allowed automatic cordon lease. +// Signature: (o *Orchestrator) recoveryCordonMaxDuration() time.Duration. +// Why: all recovery cordon decisions should share the same operator promise. +func (o *Orchestrator) recoveryCordonMaxDuration() time.Duration { + seconds := o.cfg.Startup.RecoveryCordonMaxSeconds + if seconds <= 0 { + seconds = 3600 + } + return time.Duration(seconds) * time.Second +} + +// cordonLeaseExpired reports whether a stored cordon lease is past deadline. +// Signature: cordonLeaseExpired(annotations map[string]string, now time.Time) (bool, string). +// Why: malformed or missing deadlines should fail closed so Ananke does not +// treat an indefinite cordon as healthy automation. +func cordonLeaseExpired(annotations map[string]string, now time.Time) (bool, string) { + rawDeadline := strings.TrimSpace(annotations[anankeCordonDeadlineAnnotation]) + if rawDeadline == "" { + return true, "missing deadline" + } + deadline, err := time.Parse(time.RFC3339, rawDeadline) + if err != nil { + return true, rawDeadline + } + return !now.Before(deadline), deadline.Format(time.RFC3339) +} + +// nodeUnschedulableSince reads the timestamp Kubernetes recorded for a cordon. +// Signature: nodeUnschedulableSince(node nodeReadyItem) time.Time. +// Why: unowned cordons need an age check before Ananke escalates them to manual +// action. +func nodeUnschedulableSince(node nodeReadyItem) time.Time { + for _, taint := range node.Spec.Taints { + if taint.Key == "node.kubernetes.io/unschedulable" { + return taint.TimeAdded + } + } + return time.Time{} +} + +// sanitizeCordonAnnotationValue keeps node annotation values compact. +// Signature: sanitizeCordonAnnotationValue(value string) string. +// Why: recovery details are operator-facing, but Kubernetes annotation values +// should stay short enough to remain readable in node listings. +func sanitizeCordonAnnotationValue(value string) string { + value = strings.Join(strings.Fields(strings.TrimSpace(value)), " ") + if len(value) <= 240 { + return value + } + return value[:240] +} diff --git a/internal/cluster/orchestrator_cordon_lease_test.go b/internal/cluster/orchestrator_cordon_lease_test.go new file mode 100644 index 0000000..81d2795 --- /dev/null +++ b/internal/cluster/orchestrator_cordon_lease_test.go @@ -0,0 +1,143 @@ +package cluster + +import ( + "context" + "errors" + "strings" + "testing" + "time" + + "scm.bstein.dev/bstein/ananke/internal/config" +) + +// TestEnforceRecoveryCordonLeasesUncordonsRecoveredCryptsetupNode runs one orchestration or CLI step. +// Signature: TestEnforceRecoveryCordonLeasesUncordonsRecoveredCryptsetupNode(t *testing.T). +// Why: a cryptsetup safety cordon should clear automatically once the host +// prerequisite is actually present. +func TestEnforceRecoveryCordonLeasesUncordonsRecoveredCryptsetupNode(t *testing.T) { + now := time.Now().UTC() + nodeJSON := leasedNodeJSON( + "titan-15", + cordonReasonMissingCryptsetup, + now.Add(-10*time.Minute), + now.Add(50*time.Minute), + ) + uncordoned := false + cleared := false + orch := buildOrchestratorWithStubs(t, config.Config{ + Startup: config.Startup{RecoveryCordonMaxSeconds: 3600}, + }, []commandStub{ + {match: matchContains("kubectl", "get", "nodes", "-o", "json"), out: nodeJSON}, + {match: matchContains("ssh", "cryptsetup"), out: "__ANANKE_CRYPTSETUP_PRESENT__"}, + { + match: func(name string, args []string) bool { + if !matchContains("kubectl", "uncordon", "titan-15")(name, args) { + return false + } + uncordoned = true + return true + }, + }, + { + match: func(name string, args []string) bool { + if !matchContains("kubectl", "annotate", "node", "titan-15", anankeCordonOwnerAnnotation+"-")(name, args) { + return false + } + cleared = true + return true + }, + }, + }) + + released, err := orch.enforceRecoveryCordonLeases(context.Background()) + if err != nil { + t.Fatalf("enforceRecoveryCordonLeases failed: %v", err) + } + if released != 1 || !uncordoned || !cleared { + t.Fatalf("expected recovered cordon release, released=%d uncordoned=%v cleared=%v", released, uncordoned, cleared) + } +} + +// TestEnforceRecoveryCordonLeasesReportsStaleUnownedCordon runs one orchestration or CLI step. +// Signature: TestEnforceRecoveryCordonLeasesReportsStaleUnownedCordon(t *testing.T). +// Why: Ananke must not overwrite a manual cordon, but it also must not let it sit +// silently beyond the recovery lease window. +func TestEnforceRecoveryCordonLeasesReportsStaleUnownedCordon(t *testing.T) { + old := time.Now().UTC().Add(-2 * time.Hour).Format(time.RFC3339) + nodeJSON := `{"items":[{"metadata":{"name":"titan-18","annotations":{}},"spec":{"unschedulable":true,"taints":[{"key":"node.kubernetes.io/unschedulable","timeAdded":"` + old + `"}]},"status":{"conditions":[{"type":"Ready","status":"True"}]}}]}` + manualMarked := false + orch := buildOrchestratorWithStubs(t, config.Config{ + Startup: config.Startup{RecoveryCordonMaxSeconds: 3600}, + }, []commandStub{ + {match: matchContains("kubectl", "get", "nodes", "-o", "json"), out: nodeJSON}, + { + match: func(name string, args []string) bool { + if !matchContains("kubectl", "annotate", "node", "titan-18", anankeCordonManualActionAnnotation+"=")(name, args) { + return false + } + manualMarked = true + return true + }, + }, + }) + + released, err := orch.enforceRecoveryCordonLeases(context.Background()) + if released != 0 || err == nil || !strings.Contains(err.Error(), "unowned cordon") { + t.Fatalf("expected stale unowned cordon error, released=%d err=%v", released, err) + } + if !manualMarked { + t.Fatalf("expected manual-action annotation") + } +} + +// TestEnforceRecoveryCordonLeasesEscalatesExpiredCryptsetupRepair runs one orchestration or CLI step. +// Signature: TestEnforceRecoveryCordonLeasesEscalatesExpiredCryptsetupRepair(t *testing.T). +// Why: after the lease expires, a failed automatic repair must become a clear +// manual action rather than another quiet retry. +func TestEnforceRecoveryCordonLeasesEscalatesExpiredCryptsetupRepair(t *testing.T) { + now := time.Now().UTC() + nodeJSON := leasedNodeJSON( + "titan-17", + cordonReasonMissingCryptsetup, + now.Add(-2*time.Hour), + now.Add(-time.Hour), + ) + manualMarked := false + orch := buildOrchestratorWithStubs(t, config.Config{ + Startup: config.Startup{RecoveryCordonMaxSeconds: 3600}, + }, []commandStub{ + {match: matchContains("kubectl", "get", "nodes", "-o", "json"), out: nodeJSON}, + {match: matchContains("ssh", "cryptsetup"), err: errors.New("sudo rejected")}, + { + match: func(name string, args []string) bool { + if !matchContains("kubectl", "annotate", "node", "titan-17", anankeCordonManualActionAnnotation+"=")(name, args) { + return false + } + manualMarked = true + return true + }, + }, + }) + + released, err := orch.enforceRecoveryCordonLeases(context.Background()) + if released != 0 || err == nil || !strings.Contains(err.Error(), "manual action required") { + t.Fatalf("expected expired lease manual action, released=%d err=%v", released, err) + } + if !manualMarked { + t.Fatalf("expected manual-action annotation") + } +} + +// leasedNodeJSON builds a minimal node-list payload with an Ananke cordon lease. +// Signature: leasedNodeJSON(node, reason string, created, deadline time.Time) string. +// Why: the lease tests only need node annotations, cordon state, and Ready +// condition, so a small fixture keeps intent clear. +func leasedNodeJSON(node string, reason string, created time.Time, deadline time.Time) string { + return `{"items":[{"metadata":{"name":"` + node + `","annotations":{"` + + anankeCordonOwnerAnnotation + `":"ananke","` + + anankeCordonReasonAnnotation + `":"` + reason + `","` + + anankeCordonCreatedAnnotation + `":"` + created.Format(time.RFC3339) + `","` + + anankeCordonDeadlineAnnotation + `":"` + deadline.Format(time.RFC3339) + + `"}},"spec":{"unschedulable":true,"taints":[{"key":"node.kubernetes.io/unschedulable","timeAdded":"` + created.Format(time.RFC3339) + + `"}]},"status":{"conditions":[{"type":"Ready","status":"True"}]}}]}` +} diff --git a/internal/cluster/orchestrator_critical_endpoint_additional_test.go b/internal/cluster/orchestrator_critical_endpoint_additional_test.go new file mode 100644 index 0000000..fd7a14b --- /dev/null +++ b/internal/cluster/orchestrator_critical_endpoint_additional_test.go @@ -0,0 +1,99 @@ +package cluster + +import ( + "context" + "errors" + "io" + "log" + "path/filepath" + "strings" + "testing" + "time" + + "scm.bstein.dev/bstein/ananke/internal/config" + "scm.bstein.dev/bstein/ananke/internal/execx" + "scm.bstein.dev/bstein/ananke/internal/state" +) + +// TestCriticalEndpointHelpers runs one orchestration or CLI step. +// Signature: TestCriticalEndpointHelpers(t *testing.T). +// Why: covers critical endpoint parsing and readiness checks that gate startup completion. +func TestCriticalEndpointHelpers(t *testing.T) { + cfg := config.Config{ + Startup: config.Startup{ + CriticalServiceEndpoints: []string{"monitoring/victoria-metrics-single-server"}, + }, + } + orch := buildOrchestratorWithStubs(t, cfg, []commandStub{ + {match: matchContains("kubectl", "get endpoints victoria-metrics-single-server"), out: "10.42.0.10\n10.42.0.11\n"}, + }) + ok, detail, ns, svc, err := orch.criticalServiceEndpointsReady(context.Background()) + if err != nil || !ok { + t.Fatalf("expected criticalServiceEndpointsReady success, got ok=%v detail=%q ns=%q svc=%q err=%v", ok, detail, ns, svc, err) + } + if detail != "services=1" { + t.Fatalf("unexpected readiness detail: %q", detail) + } + gotNS, gotSvc, err := parseCriticalServiceEndpoint("monitoring/victoria-metrics-single-server") + if err != nil || gotNS != "monitoring" || gotSvc != "victoria-metrics-single-server" { + t.Fatalf("unexpected parse result ns=%q svc=%q err=%v", gotNS, gotSvc, err) + } + if _, _, err := parseCriticalServiceEndpoint("invalid"); err == nil { + t.Fatalf("expected parseCriticalServiceEndpoint error") + } +} + +// TestCriticalEndpointAutoHealWorkflow runs one orchestration or CLI step. +// Signature: TestCriticalEndpointAutoHealWorkflow(t *testing.T). +// Why: covers endpoint-zero recovery where startup heals workload replicas before succeeding. +func TestCriticalEndpointAutoHealWorkflow(t *testing.T) { + cfg := config.Config{ + Startup: config.Startup{ + CriticalServiceEndpointWaitSec: 2, + CriticalServiceEndpointPollSec: 1, + CriticalServiceEndpoints: []string{"monitoring/victoria-metrics-single-server"}, + }, + State: config.State{ + Dir: t.TempDir(), + ReportsDir: filepath.Join(t.TempDir(), "reports"), + RunHistoryPath: filepath.Join(t.TempDir(), "runs.json"), + }, + } + orch := &Orchestrator{ + cfg: cfg, + runner: &execx.Runner{}, + store: state.New(cfg.State.RunHistoryPath), + log: log.New(io.Discard, "", 0), + } + + endpointChecks := 0 + dispatch := func(_ context.Context, _ time.Duration, name string, args ...string) (string, error) { + joined := name + " " + strings.Join(args, " ") + if strings.Contains(joined, "get endpoints victoria-metrics-single-server") { + endpointChecks++ + if endpointChecks == 1 { + return "", nil + } + return "10.42.0.10\n", nil + } + if strings.Contains(joined, "scale deployment victoria-metrics-single-server") { + return "", errors.New(`Error from server (NotFound): deployments.apps "victoria-metrics-single-server" not found`) + } + if strings.Contains(joined, "scale statefulset victoria-metrics-single-server") { + return "", nil + } + if strings.Contains(joined, "rollout status statefulset/victoria-metrics-single-server") { + return "statefulset rolled out", nil + } + return "", nil + } + orch.runOverride = dispatch + orch.runSensitiveOverride = dispatch + + if err := orch.waitForCriticalServiceEndpoints(context.Background()); err != nil { + t.Fatalf("waitForCriticalServiceEndpoints failed: %v", err) + } + if endpointChecks < 2 { + t.Fatalf("expected repeated endpoint checks, got %d", endpointChecks) + } +} diff --git a/internal/cluster/orchestrator_critical_vault.go b/internal/cluster/orchestrator_critical_vault.go index c4f5c66..33d37cb 100644 --- a/internal/cluster/orchestrator_critical_vault.go +++ b/internal/cluster/orchestrator_critical_vault.go @@ -7,7 +7,6 @@ import ( "fmt" "os" "path/filepath" - "strconv" "strings" "time" ) @@ -465,43 +464,3 @@ func (o *Orchestrator) readVaultUnsealKeyFile() (string, error) { } return key, nil } - -// workloadReady runs one orchestration or CLI step. -// Signature: (o *Orchestrator) workloadReady(ctx context.Context, w startupWorkload) (bool, error). -// Why: keeps behavior explicit so startup/shutdown workflows remain maintainable as services evolve. -func (o *Orchestrator) workloadReady(ctx context.Context, w startupWorkload) (bool, error) { - out, err := o.kubectl( - ctx, - 20*time.Second, - "-n", - w.Namespace, - "get", - w.Kind, - w.Name, - "-o", - "jsonpath={.status.readyReplicas}", - ) - if err != nil { - return false, err - } - raw := strings.TrimSpace(out) - if raw == "" || raw == "" { - return false, nil - } - n, err := strconv.Atoi(raw) - if err != nil { - return false, fmt.Errorf("parse readyReplicas %q: %w", raw, err) - } - return n >= 1, nil -} - -// isNotFoundErr runs one orchestration or CLI step. -// Signature: isNotFoundErr(err error) bool. -// Why: keeps behavior explicit so startup/shutdown workflows remain maintainable as services evolve. -func isNotFoundErr(err error) bool { - if err == nil { - return false - } - msg := strings.ToLower(err.Error()) - return strings.Contains(msg, "not found") || strings.Contains(msg, "(notfound)") -} diff --git a/internal/cluster/orchestrator_lifecycle.go b/internal/cluster/orchestrator_lifecycle.go index d413e77..05a58a6 100644 --- a/internal/cluster/orchestrator_lifecycle.go +++ b/internal/cluster/orchestrator_lifecycle.go @@ -446,75 +446,3 @@ func (o *Orchestrator) EtcdRestore(ctx context.Context, opts EtcdRestoreOptions) } return nil } - -// Shutdown runs one orchestration or CLI step. -// Signature: (o *Orchestrator) Shutdown(ctx context.Context, opts ShutdownOptions) (err error). -// Why: keeps behavior explicit so startup/shutdown workflows remain maintainable as services evolve. -func (o *Orchestrator) Shutdown(ctx context.Context, opts ShutdownOptions) (err error) { - unlock, err := state.AcquireLock(o.cfg.State.LockPath) - if err != nil { - return err - } - defer unlock() - if invErr := o.validateNodeInventory(); invErr != nil { - return invErr - } - - record := state.RunRecord{ - ID: fmt.Sprintf("shutdown-%d", time.Now().UnixNano()), - Action: "shutdown", - Reason: opts.Reason, - DryRun: o.runner.DryRun, - StartedAt: time.Now().UTC(), - } - defer o.finalizeRecord(&record, &err) - if !o.runner.DryRun { - if writeErr := state.MustWriteIntent(o.cfg.State.IntentPath, state.IntentShuttingDown, opts.Reason, "shutdown"); writeErr != nil { - return fmt.Errorf("set shutdown intent: %w", writeErr) - } - defer func() { - final := state.IntentShuttingDown - if err == nil { - final = state.IntentShutdownComplete - } - if writeErr := state.MustWriteIntent(o.cfg.State.IntentPath, final, opts.Reason, "shutdown"); writeErr != nil { - o.log.Printf("warning: write shutdown completion intent failed: %v", writeErr) - } - }() - } - - workers, err := o.effectiveWorkers(ctx) - if err != nil { - return err - } - o.log.Printf("shutdown control-planes=%s workers=%s", strings.Join(o.cfg.ControlPlanes, ","), strings.Join(workers, ",")) - - o.reportFluxSource(ctx, "") - - skipEtcd := opts.SkipEtcdSnapshot || o.cfg.Shutdown.SkipEtcdSnapshot - if !skipEtcd { - o.bestEffort("etcd snapshot", func() error { - return o.takeEtcdSnapshot(ctx, o.cfg.ControlPlanes[0]) - }) - } - - o.bestEffort("suspend flux", func() error { return o.patchFluxSuspendAll(ctx, true) }) - o.bestEffort("scale down apps", func() error { return o.scaleDownApps(ctx) }) - - skipDrain := opts.SkipDrain || o.cfg.Shutdown.SkipDrain - if !skipDrain { - o.bestEffort("drain workers", func() error { return o.drainWorkers(ctx, workers) }) - } - - shutdownMode := strings.TrimSpace(opts.Mode) - effectiveMode, modeErr := normalizeShutdownMode(shutdownMode) - if modeErr != nil { - return modeErr - } - o.log.Printf("shutdown execution mode=%s (requested=%q)", effectiveMode, shutdownMode) - - o.stopWorkers(ctx, workers) - o.stopControlPlanes(ctx, o.cfg.ControlPlanes) - o.log.Printf("shutdown flow complete") - return nil -} diff --git a/internal/cluster/orchestrator_longhorn_recovery.go b/internal/cluster/orchestrator_longhorn_recovery.go new file mode 100644 index 0000000..2639581 --- /dev/null +++ b/internal/cluster/orchestrator_longhorn_recovery.go @@ -0,0 +1,254 @@ +package cluster + +import ( + "context" + "encoding/json" + "fmt" + "strings" + "time" +) + +// repairEncryptedVolumeMountPrereqs runs one orchestration or CLI step. +// Signature: (o *Orchestrator) repairEncryptedVolumeMountPrereqs(ctx context.Context, pods podList, grace time.Duration) (map[string]string, error). +// Why: encrypted Longhorn volume mounts depend on host cryptsetup. After node +// rebuilds or partial OS recovery, Kubernetes may be ready while kubelet cannot +// mount encrypted PVCs; installing the missing host tool and recycling the +// controller-owned pod lets kubelet retry the same volume safely. +func (o *Orchestrator) repairEncryptedVolumeMountPrereqs(ctx context.Context, pods podList, grace time.Duration) (map[string]string, error) { + eventsOut, err := o.kubectl(ctx, 30*time.Second, "get", "events", "-A", "-o", "json") + if err != nil { + return nil, fmt.Errorf("query events for encrypted volume mount scan: %w", err) + } + var events eventList + if err := json.Unmarshal([]byte(eventsOut), &events); err != nil { + return nil, fmt.Errorf("decode events for encrypted volume mount scan: %w", err) + } + + podsByKey := map[string]podResource{} + for _, pod := range pods.Items { + ns := strings.TrimSpace(pod.Metadata.Namespace) + name := strings.TrimSpace(pod.Metadata.Name) + node := strings.TrimSpace(pod.Spec.NodeName) + if ns == "" || name == "" || node == "" { + continue + } + if !strings.EqualFold(strings.TrimSpace(pod.Status.Phase), "Pending") { + continue + } + if !podControllerOwned(pod) { + continue + } + if !pod.Metadata.CreationTimestamp.IsZero() && time.Since(pod.Metadata.CreationTimestamp) < grace { + continue + } + podsByKey[ns+"/"+name] = pod + } + if len(podsByKey) == 0 { + return map[string]string{}, nil + } + + repairedNodes := map[string]bool{} + reasons := map[string]string{} + for _, event := range events.Items { + if !strings.EqualFold(strings.TrimSpace(event.Type), "Warning") { + continue + } + if strings.TrimSpace(event.Reason) != "FailedMount" { + continue + } + if !strings.EqualFold(strings.TrimSpace(event.InvolvedObject.Kind), "Pod") { + continue + } + key := strings.TrimSpace(event.InvolvedObject.Namespace) + "/" + strings.TrimSpace(event.InvolvedObject.Name) + pod, ok := podsByKey[key] + if !ok { + continue + } + lastSeen := eventLastObservedAt(event) + if !lastSeen.IsZero() && !pod.Metadata.CreationTimestamp.IsZero() && lastSeen.Before(pod.Metadata.CreationTimestamp) { + continue + } + message := strings.ToLower(strings.TrimSpace(event.Message)) + if !strings.Contains(message, "cryptsetup") || !strings.Contains(message, "no such file or directory") { + continue + } + node := strings.TrimSpace(pod.Spec.NodeName) + if node == "" || !o.sshManaged(node) { + o.log.Printf("warning: encrypted volume mount blocked on unmanaged node %s for pod %s", node, key) + continue + } + if repaired, ok := repairedNodes[node]; ok { + if repaired { + reasons[key] = "EncryptedVolumeCryptsetupRepaired:" + node + } + continue + } + if err := o.ensureHostCryptsetup(ctx, node); err != nil { + repairedNodes[node] = false + o.log.Printf("warning: cryptsetup prerequisite repair failed on %s for pod %s: %v", node, key, err) + if cordonErr := o.cordonNodeForMissingCryptsetup(ctx, node); cordonErr != nil { + o.log.Printf("warning: cordon failed after cryptsetup repair failure on %s for pod %s: %v", node, key, cordonErr) + continue + } + reasons[key] = "EncryptedVolumeCryptsetupNodeCordoned:" + node + continue + } + repairedNodes[node] = true + reasons[key] = "EncryptedVolumeCryptsetupRepaired:" + node + } + return reasons, nil +} + +// ensureHostCryptsetup runs one orchestration or CLI step. +// Signature: (o *Orchestrator) ensureHostCryptsetup(ctx context.Context, node string) error. +// Why: kubelet's encrypted Longhorn mount helper shells into the host namespace, +// so the package must exist on the node host, not merely inside a workload pod. +func (o *Orchestrator) ensureHostCryptsetup(ctx context.Context, node string) error { + command := strings.Join([]string{ + "set -eu", + "if command -v cryptsetup >/dev/null 2>&1 || [ -x /usr/sbin/cryptsetup ] || [ -x /usr/bin/cryptsetup ]; then echo __ANANKE_CRYPTSETUP_PRESENT__; exit 0; fi", + "if ! command -v apt-get >/dev/null 2>&1; then echo __ANANKE_CRYPTSETUP_NO_APT__; exit 42; fi", + "sudo -n env DEBIAN_FRONTEND=noninteractive apt-get update", + "sudo -n env DEBIAN_FRONTEND=noninteractive apt-get install -y --no-install-recommends cryptsetup-bin", + "if command -v cryptsetup >/dev/null 2>&1 || [ -x /usr/sbin/cryptsetup ] || [ -x /usr/bin/cryptsetup ]; then echo __ANANKE_CRYPTSETUP_INSTALLED__; exit 0; fi", + "echo __ANANKE_CRYPTSETUP_INSTALL_FAILED__", + "exit 43", + }, "; ") + out, err := o.sshWithTimeout(ctx, node, command, 5*time.Minute) + if err != nil { + return fmt.Errorf("install cryptsetup-bin: %w (output=%s)", err, strings.TrimSpace(out)) + } + trimmed := strings.TrimSpace(out) + o.log.Printf("ensured cryptsetup prerequisite on %s: %s", node, trimmed) + if strings.Contains(trimmed, "__ANANKE_CRYPTSETUP_INSTALLED__") { + o.noteStartupAutoHeal(fmt.Sprintf("installed cryptsetup on %s", node)) + } + return nil +} + +// cordonNodeForMissingCryptsetup runs one orchestration or CLI step. +// Signature: (o *Orchestrator) cordonNodeForMissingCryptsetup(ctx context.Context, node string) error. +// Why: when host package repair is not permitted, cordoning is the safest +// automatic fallback: it prevents new encrypted-volume pods from landing on a +// node kubelet cannot mount from, while leaving existing workloads and storage +// objects untouched. +func (o *Orchestrator) cordonNodeForMissingCryptsetup(ctx context.Context, node string) error { + if err := o.cordonNodeWithLease(ctx, node, cordonReasonMissingCryptsetup, "encrypted Longhorn volume mount failed because host cryptsetup is missing"); err != nil { + return err + } + o.log.Printf("cordoned node %s after encrypted volume cryptsetup prerequisite failure", node) + o.noteStartupAutoHeal(fmt.Sprintf("cordoned %s after missing cryptsetup blocked encrypted volume mount", node)) + return nil +} + +// longhornAttachBlockedPodReasons runs one orchestration or CLI step. +// Signature: (o *Orchestrator) longhornAttachBlockedPodReasons(ctx context.Context, pods podList, grace time.Duration) (map[string]string, error). +// Why: after a power event, Kubernetes can schedule a Longhorn-backed pod onto a +// node Longhorn still marks unready. Recycling the unattached Pending pod lets +// the scheduler pick a Longhorn-ready node without touching Longhorn data-plane +// objects. +func (o *Orchestrator) longhornAttachBlockedPodReasons(ctx context.Context, pods podList, grace time.Duration) (map[string]string, error) { + unreadyNodes, err := o.longhornUnreadyNodes(ctx) + if err != nil { + return nil, err + } + if len(unreadyNodes) == 0 { + return map[string]string{}, nil + } + + eventsOut, err := o.kubectl(ctx, 30*time.Second, "get", "events", "-A", "-o", "json") + if err != nil { + return nil, fmt.Errorf("query events for longhorn attach-blocked pod scan: %w", err) + } + var events eventList + if err := json.Unmarshal([]byte(eventsOut), &events); err != nil { + return nil, fmt.Errorf("decode events for longhorn attach-blocked pod scan: %w", err) + } + + podsByKey := map[string]podResource{} + for _, pod := range pods.Items { + ns := strings.TrimSpace(pod.Metadata.Namespace) + name := strings.TrimSpace(pod.Metadata.Name) + node := strings.TrimSpace(pod.Spec.NodeName) + if ns == "" || name == "" || node == "" { + continue + } + if !strings.EqualFold(strings.TrimSpace(pod.Status.Phase), "Pending") { + continue + } + if _, unready := unreadyNodes[node]; !unready { + continue + } + if !podControllerOwned(pod) { + continue + } + if !pod.Metadata.CreationTimestamp.IsZero() && time.Since(pod.Metadata.CreationTimestamp) < grace { + continue + } + podsByKey[ns+"/"+name] = pod + } + if len(podsByKey) == 0 { + return map[string]string{}, nil + } + + reasons := map[string]string{} + for _, event := range events.Items { + if !strings.EqualFold(strings.TrimSpace(event.Type), "Warning") { + continue + } + if strings.TrimSpace(event.Reason) != "FailedAttachVolume" { + continue + } + if !strings.EqualFold(strings.TrimSpace(event.InvolvedObject.Kind), "Pod") { + continue + } + key := strings.TrimSpace(event.InvolvedObject.Namespace) + "/" + strings.TrimSpace(event.InvolvedObject.Name) + pod, ok := podsByKey[key] + if !ok { + continue + } + lastSeen := eventLastObservedAt(event) + if !lastSeen.IsZero() && !pod.Metadata.CreationTimestamp.IsZero() && lastSeen.Before(pod.Metadata.CreationTimestamp) { + continue + } + node := strings.TrimSpace(pod.Spec.NodeName) + message := strings.ToLower(strings.TrimSpace(event.Message)) + if !strings.Contains(message, "longhorn-backend") || !strings.Contains(message, "failed for volume") { + continue + } + if !strings.Contains(message, "node "+strings.ToLower(node)+" is not ready") { + continue + } + reasons[key] = "LonghornAttachBlockedOnUnreadyNode:" + node + } + return reasons, nil +} + +// longhornUnreadyNodes runs one orchestration or CLI step. +// Signature: (o *Orchestrator) longhornUnreadyNodes(ctx context.Context) (map[string]struct{}, error). +// Why: Longhorn node readiness can lag or intentionally differ from Kubernetes +// node readiness; attach recovery must use Longhorn's view for safety. +func (o *Orchestrator) longhornUnreadyNodes(ctx context.Context) (map[string]struct{}, error) { + out, err := o.kubectl(ctx, 30*time.Second, + "-n", "longhorn-system", + "get", "nodes.longhorn.io", + "-o", "jsonpath={range .items[*]}{.metadata.name}{'\\t'}{range .status.conditions[?(@.type==\"Ready\")]}{.status}{end}{'\\n'}{end}", + ) + if err != nil { + if isNotFoundErr(err) { + return map[string]struct{}{}, nil + } + return nil, fmt.Errorf("query longhorn node readiness: %w", err) + } + unready := map[string]struct{}{} + for _, line := range lines(out) { + fields := strings.Fields(line) + if len(fields) < 2 { + continue + } + if !strings.EqualFold(strings.TrimSpace(fields[1]), "True") { + unready[strings.TrimSpace(fields[0])] = struct{}{} + } + } + return unready, nil +} diff --git a/internal/cluster/orchestrator_pod_recovery.go b/internal/cluster/orchestrator_pod_recovery.go new file mode 100644 index 0000000..abaa800 --- /dev/null +++ b/internal/cluster/orchestrator_pod_recovery.go @@ -0,0 +1,139 @@ +package cluster + +import ( + "context" + "strings" + "time" +) + +// staleControllerPodReasons runs one orchestration or CLI step. +// Signature: (o *Orchestrator) staleControllerPodReasons(ctx context.Context, pods podList, grace time.Duration) (map[string]string, error). +// Why: after node or kubelet recovery, controller-owned pods can stay in +// terminal or unknown status even though the node is Ready and a replacement may +// already be healthy. A normal pod delete lets Kubernetes clean the stale status +// without touching storage objects or forcing deletion on a partitioned node. +func (o *Orchestrator) staleControllerPodReasons(ctx context.Context, pods podList, grace time.Duration) (map[string]string, error) { + unavailable, err := o.unavailableNodeSet(ctx) + if err != nil { + return nil, err + } + reasons := map[string]string{} + for _, pod := range pods.Items { + ns := strings.TrimSpace(pod.Metadata.Namespace) + name := strings.TrimSpace(pod.Metadata.Name) + node := strings.TrimSpace(pod.Spec.NodeName) + if ns == "" || name == "" || node == "" { + continue + } + phase := strings.TrimSpace(pod.Status.Phase) + if !strings.EqualFold(phase, "Unknown") && !strings.EqualFold(phase, "Failed") { + continue + } + if _, badNode := unavailable[node]; badNode { + continue + } + if !podControllerOwned(pod) { + continue + } + if !pod.Metadata.CreationTimestamp.IsZero() && time.Since(pod.Metadata.CreationTimestamp) < grace { + continue + } + reasons[ns+"/"+name] = "StaleControllerPodOnReadyNode:" + node + ":" + phase + } + return reasons, nil +} + +// staleControllerPodForceDeleteSafe runs one orchestration or CLI step. +// Signature: staleControllerPodForceDeleteSafe(pod podResource, grace time.Duration) bool. +// Why: a stale pod already marked for deletion may need force removal after a +// node outage. Keep that fallback away from PVC-bearing pods so Ananke never +// risks duplicating a storage writer. +func staleControllerPodForceDeleteSafe(pod podResource, grace time.Duration) bool { + if pod.Metadata.DeletionTimestamp == nil { + return false + } + if time.Since(*pod.Metadata.DeletionTimestamp) < grace { + return false + } + if podUsesPersistentVolumeClaim(pod) { + return false + } + return true +} + +// podUsesPersistentVolumeClaim runs one orchestration or CLI step. +// Signature: podUsesPersistentVolumeClaim(pod podResource) bool. +// Why: force-delete recovery is deliberately disallowed for pods with PVCs; the +// scheduler and storage controller need to settle those normally. +func podUsesPersistentVolumeClaim(pod podResource) bool { + for _, volume := range pod.Spec.Volumes { + if volume.PersistentVolumeClaim != nil && strings.TrimSpace(volume.PersistentVolumeClaim.ClaimName) != "" { + return true + } + } + return false +} + +// podControllerOwned runs one orchestration or CLI step. +// Signature: podControllerOwned(p podResource) bool. +// Why: keeps behavior explicit so startup/shutdown workflows remain maintainable as services evolve. +func podControllerOwned(p podResource) bool { + for _, owner := range p.Metadata.OwnerReferences { + switch strings.TrimSpace(owner.Kind) { + case "ReplicaSet", "StatefulSet", "DaemonSet", "Job": + return true + } + } + return false +} + +// stuckContainerReason runs one orchestration or CLI step. +// Signature: stuckContainerReason(p podResource, reasons map[string]struct{}) string. +// Why: keeps behavior explicit so startup/shutdown workflows remain maintainable as services evolve. +func stuckContainerReason(p podResource, reasons map[string]struct{}) string { + check := func(statuses []podContainerStatus) string { + for _, st := range statuses { + if st.State.Waiting == nil { + continue + } + reason := strings.TrimSpace(st.State.Waiting.Reason) + if reason == "" { + continue + } + if _, ok := reasons[reason]; ok { + return reason + } + } + return "" + } + if reason := check(p.Status.InitContainerStatuses); reason != "" { + return reason + } + return check(p.Status.ContainerStatuses) +} + +// stuckVaultInitReason runs one orchestration or CLI step. +// Signature: stuckVaultInitReason(p podResource, grace time.Duration) string. +// Why: keeps behavior explicit so startup/shutdown workflows remain maintainable as services evolve. +func stuckVaultInitReason(p podResource, grace time.Duration) string { + if !strings.EqualFold(strings.TrimSpace(p.Status.Phase), "Pending") { + return "" + } + if !strings.EqualFold(strings.TrimSpace(p.Metadata.Annotations["vault.hashicorp.com/agent-inject"]), "true") { + return "" + } + for _, st := range p.Status.InitContainerStatuses { + if strings.TrimSpace(st.Name) != "vault-agent-init" || st.State.Running == nil { + continue + } + startedAt := st.State.Running.StartedAt + if startedAt.IsZero() { + continue + } + if time.Since(startedAt) < grace { + return "" + } + return "VaultInitStuck" + } + return "" +} diff --git a/internal/cluster/orchestrator_shutdown.go b/internal/cluster/orchestrator_shutdown.go new file mode 100644 index 0000000..bd8aadd --- /dev/null +++ b/internal/cluster/orchestrator_shutdown.go @@ -0,0 +1,82 @@ +package cluster + +import ( + "context" + "fmt" + "strings" + "time" + + "scm.bstein.dev/bstein/ananke/internal/state" +) + +// Shutdown runs one orchestration or CLI step. +// Signature: (o *Orchestrator) Shutdown(ctx context.Context, opts ShutdownOptions) (err error). +// Why: keeps behavior explicit so startup/shutdown workflows remain maintainable as services evolve. +func (o *Orchestrator) Shutdown(ctx context.Context, opts ShutdownOptions) (err error) { + unlock, err := state.AcquireLock(o.cfg.State.LockPath) + if err != nil { + return err + } + defer unlock() + if invErr := o.validateNodeInventory(); invErr != nil { + return invErr + } + + record := state.RunRecord{ + ID: fmt.Sprintf("shutdown-%d", time.Now().UnixNano()), + Action: "shutdown", + Reason: opts.Reason, + DryRun: o.runner.DryRun, + StartedAt: time.Now().UTC(), + } + defer o.finalizeRecord(&record, &err) + if !o.runner.DryRun { + if writeErr := state.MustWriteIntent(o.cfg.State.IntentPath, state.IntentShuttingDown, opts.Reason, "shutdown"); writeErr != nil { + return fmt.Errorf("set shutdown intent: %w", writeErr) + } + defer func() { + final := state.IntentShuttingDown + if err == nil { + final = state.IntentShutdownComplete + } + if writeErr := state.MustWriteIntent(o.cfg.State.IntentPath, final, opts.Reason, "shutdown"); writeErr != nil { + o.log.Printf("warning: write shutdown completion intent failed: %v", writeErr) + } + }() + } + + workers, err := o.effectiveWorkers(ctx) + if err != nil { + return err + } + o.log.Printf("shutdown control-planes=%s workers=%s", strings.Join(o.cfg.ControlPlanes, ","), strings.Join(workers, ",")) + + o.reportFluxSource(ctx, "") + + skipEtcd := opts.SkipEtcdSnapshot || o.cfg.Shutdown.SkipEtcdSnapshot + if !skipEtcd { + o.bestEffort("etcd snapshot", func() error { + return o.takeEtcdSnapshot(ctx, o.cfg.ControlPlanes[0]) + }) + } + + o.bestEffort("suspend flux", func() error { return o.patchFluxSuspendAll(ctx, true) }) + o.bestEffort("scale down apps", func() error { return o.scaleDownApps(ctx) }) + + skipDrain := opts.SkipDrain || o.cfg.Shutdown.SkipDrain + if !skipDrain { + o.bestEffort("drain workers", func() error { return o.drainWorkers(ctx, workers) }) + } + + shutdownMode := strings.TrimSpace(opts.Mode) + effectiveMode, modeErr := normalizeShutdownMode(shutdownMode) + if modeErr != nil { + return modeErr + } + o.log.Printf("shutdown execution mode=%s (requested=%q)", effectiveMode, shutdownMode) + + o.stopWorkers(ctx, workers) + o.stopControlPlanes(ctx, o.cfg.ControlPlanes) + o.log.Printf("shutdown flow complete") + return nil +} diff --git a/internal/cluster/orchestrator_unit_additional_test.go b/internal/cluster/orchestrator_unit_additional_test.go index f03a562..355feec 100644 --- a/internal/cluster/orchestrator_unit_additional_test.go +++ b/internal/cluster/orchestrator_unit_additional_test.go @@ -141,453 +141,6 @@ func TestStartupEarlyFailureLeavesFluxSuspensionUnchanged(t *testing.T) { } } -// TestRecycleStuckControllerPodsHandlesLonghornAttachBlockedPods runs one orchestration or CLI step. -// Signature: TestRecycleStuckControllerPodsHandlesLonghornAttachBlockedPods(t *testing.T). -// Why: Pending Longhorn-backed pods on Longhorn-unready nodes should be -// rescheduled without mutating Longhorn volume, replica, or disk objects. -func TestRecycleStuckControllerPodsHandlesLonghornAttachBlockedPods(t *testing.T) { - created := time.Now().Add(-10 * time.Minute).UTC().Format(time.RFC3339) - lastSeen := time.Now().UTC().Format(time.RFC3339) - pods := `{"items":[{"metadata":{"namespace":"monitoring","name":"victoria-metrics-single-server-0","creationTimestamp":"` + created + `","ownerReferences":[{"kind":"StatefulSet","name":"victoria-metrics-single-server"}]},"spec":{"nodeName":"titan-0b"},"status":{"phase":"Pending"}}]}` - events := `{"items":[{"metadata":{"namespace":"monitoring","creationTimestamp":"` + lastSeen + `"},"involvedObject":{"kind":"Pod","namespace":"monitoring","name":"victoria-metrics-single-server-0"},"type":"Warning","reason":"FailedAttachVolume","message":"AttachVolume.Attach failed for volume \"pvc-1\" : rpc error from [http://longhorn-backend:9500/v1/volumes/pvc-1?action=attach]: unable to attach volume pvc-1 to titan-0b: node titan-0b is not ready","lastTimestamp":"` + lastSeen + `"}]}` - - deleted := false - orch := buildOrchestratorWithStubs(t, config.Config{ - Startup: config.Startup{StuckPodGraceSeconds: 180}, - }, []commandStub{ - {match: matchContains("kubectl", "get", "pods", "-A", "-o", "json"), out: pods}, - {match: matchContains("kubectl", "-n", "longhorn-system", "get", "nodes.longhorn.io"), out: "titan-0b\tFalse\n"}, - {match: matchContains("kubectl", "get", "events", "-A", "-o", "json"), out: events}, - {match: matchContains("kubectl", "get", "nodes", "-o", "json"), out: `{"items":[{"metadata":{"name":"titan-0b"},"status":{"conditions":[{"type":"Ready","status":"True"}]}}]}`}, - { - match: func(name string, args []string) bool { - if !matchContains("kubectl", "-n", "monitoring", "delete", "pod", "victoria-metrics-single-server-0", "--wait=false")(name, args) { - return false - } - deleted = true - return true - }, - }, - }) - - if err := orch.recycleStuckControllerPods(context.Background()); err != nil { - t.Fatalf("recycleStuckControllerPods failed: %v", err) - } - if !deleted { - t.Fatalf("expected longhorn attach-blocked pending pod to be recycled") - } -} - -// TestRecycleStuckControllerPodsRepairsEncryptedVolumeCryptsetup runs one orchestration or CLI step. -// Signature: TestRecycleStuckControllerPodsRepairsEncryptedVolumeCryptsetup(t *testing.T). -// Why: encrypted Longhorn PVC recovery should repair missing host cryptsetup and -// then recycle the blocked pod without touching Longhorn data-plane objects. -func TestRecycleStuckControllerPodsRepairsEncryptedVolumeCryptsetup(t *testing.T) { - created := time.Now().Add(-10 * time.Minute).UTC().Format(time.RFC3339) - lastSeen := time.Now().UTC().Format(time.RFC3339) - pods := `{"items":[{"metadata":{"namespace":"finance","name":"actual-budget-abc","creationTimestamp":"` + created + `","ownerReferences":[{"kind":"ReplicaSet","name":"actual-budget"}]},"spec":{"nodeName":"titan-19"},"status":{"phase":"Pending"}}]}` - events := `{"items":[{"metadata":{"namespace":"finance","creationTimestamp":"` + lastSeen + `"},"involvedObject":{"kind":"Pod","namespace":"finance","name":"actual-budget-abc"},"type":"Warning","reason":"FailedMount","message":"MountVolume.MountDevice failed for volume \"pvc-1\" : nsenter: failed to execute cryptsetup: No such file or directory","lastTimestamp":"` + lastSeen + `"}]}` - - installed := false - deleted := false - orch := buildOrchestratorWithStubs(t, config.Config{ - Startup: config.Startup{StuckPodGraceSeconds: 180}, - }, []commandStub{ - {match: matchContains("kubectl", "get", "pods", "-A", "-o", "json"), out: pods}, - {match: matchContains("kubectl", "-n", "longhorn-system", "get", "nodes.longhorn.io"), out: "titan-19\tTrue\n"}, - {match: matchContains("kubectl", "get", "events", "-A", "-o", "json"), out: events}, - {match: matchContains("kubectl", "get", "nodes", "-o", "json"), out: `{"items":[{"metadata":{"name":"titan-19"},"status":{"conditions":[{"type":"Ready","status":"True"}]}}]}`}, - { - match: func(name string, args []string) bool { - if name != "ssh" || !strings.Contains(strings.Join(args, " "), "apt-get install -y --no-install-recommends cryptsetup-bin") { - return false - } - installed = true - return true - }, - out: "__ANANKE_CRYPTSETUP_INSTALLED__", - }, - { - match: func(name string, args []string) bool { - if !matchContains("kubectl", "-n", "finance", "delete", "pod", "actual-budget-abc", "--wait=false")(name, args) { - return false - } - deleted = true - return true - }, - }, - }) - - if err := orch.recycleStuckControllerPods(context.Background()); err != nil { - t.Fatalf("recycleStuckControllerPods failed: %v", err) - } - if !installed { - t.Fatalf("expected missing host cryptsetup to be installed") - } - if !deleted { - t.Fatalf("expected encrypted-volume blocked pod to be recycled") - } -} - -// TestRecycleStuckControllerPodsCordonsEncryptedVolumeNodeWhenRepairFails runs one orchestration or CLI step. -// Signature: TestRecycleStuckControllerPodsCordonsEncryptedVolumeNodeWhenRepairFails(t *testing.T). -// Why: when host package repair is blocked by sudo policy, Ananke should avoid -// the bad node and retry the controller-owned pod elsewhere. -func TestRecycleStuckControllerPodsCordonsEncryptedVolumeNodeWhenRepairFails(t *testing.T) { - created := time.Now().Add(-10 * time.Minute).UTC().Format(time.RFC3339) - lastSeen := time.Now().UTC().Format(time.RFC3339) - pods := `{"items":[{"metadata":{"namespace":"finance","name":"actual-budget-abc","creationTimestamp":"` + created + `","ownerReferences":[{"kind":"ReplicaSet","name":"actual-budget"}]},"spec":{"nodeName":"titan-19"},"status":{"phase":"Pending"}}]}` - events := `{"items":[{"metadata":{"namespace":"finance","creationTimestamp":"` + lastSeen + `"},"involvedObject":{"kind":"Pod","namespace":"finance","name":"actual-budget-abc"},"type":"Warning","reason":"FailedMount","message":"MountVolume.MountDevice failed for volume \"pvc-1\" : nsenter: failed to execute cryptsetup: No such file or directory","lastTimestamp":"` + lastSeen + `"}]}` - - cordoned := false - deleted := false - orch := buildOrchestratorWithStubs(t, config.Config{ - Startup: config.Startup{StuckPodGraceSeconds: 180}, - }, []commandStub{ - {match: matchContains("kubectl", "get", "pods", "-A", "-o", "json"), out: pods}, - {match: matchContains("kubectl", "-n", "longhorn-system", "get", "nodes.longhorn.io"), out: "titan-19\tTrue\n"}, - {match: matchContains("kubectl", "get", "events", "-A", "-o", "json"), out: events}, - {match: matchContains("kubectl", "get", "nodes", "-o", "json"), out: `{"items":[{"metadata":{"name":"titan-19"},"status":{"conditions":[{"type":"Ready","status":"True"}]}}]}`}, - { - match: matchContains("ssh", "apt-get install -y --no-install-recommends cryptsetup-bin"), - err: errors.New("sudo: a password is required"), - }, - { - match: func(name string, args []string) bool { - if !matchContains("kubectl", "cordon", "titan-19")(name, args) { - return false - } - cordoned = true - return true - }, - }, - { - match: func(name string, args []string) bool { - if !matchContains("kubectl", "-n", "finance", "delete", "pod", "actual-budget-abc", "--wait=false")(name, args) { - return false - } - deleted = true - return true - }, - }, - }) - - if err := orch.recycleStuckControllerPods(context.Background()); err != nil { - t.Fatalf("recycleStuckControllerPods failed: %v", err) - } - if !cordoned { - t.Fatalf("expected cryptsetup-missing node to be cordoned") - } - if !deleted { - t.Fatalf("expected encrypted-volume blocked pod to be recycled") - } -} - -// TestRecycleStuckControllerPodsHandlesStalePodsOnReadyNodes runs one orchestration or CLI step. -// Signature: TestRecycleStuckControllerPodsHandlesStalePodsOnReadyNodes(t *testing.T). -// Why: post-outage controller pods can remain Unknown or Failed after their -// node recovers; deletion clears stale status while force deletion stays away -// from PVC-backed storage. -func TestRecycleStuckControllerPodsHandlesStalePodsOnReadyNodes(t *testing.T) { - old := time.Now().Add(-10 * time.Minute).UTC().Format(time.RFC3339) - recent := time.Now().Add(-30 * time.Second).UTC().Format(time.RFC3339) - pods := `{"items":[` + - `{"metadata":{"namespace":"longhorn-system","name":"longhorn-vault-sync-old","creationTimestamp":"` + old + `","ownerReferences":[{"kind":"ReplicaSet","name":"longhorn-vault-sync"}]},"spec":{"nodeName":"titan-12"},"status":{"phase":"Unknown"}},` + - `{"metadata":{"namespace":"longhorn-system","name":"longhorn-vault-sync-failed","creationTimestamp":"` + old + `","deletionTimestamp":"` + old + `","ownerReferences":[{"kind":"ReplicaSet","name":"longhorn-vault-sync"}]},"spec":{"nodeName":"titan-12","volumes":[{"name":"secret"}]},"status":{"phase":"Failed"}},` + - `{"metadata":{"namespace":"logging","name":"oauth2-proxy-terminating","creationTimestamp":"` + old + `","deletionTimestamp":"` + old + `","ownerReferences":[{"kind":"ReplicaSet","name":"oauth2-proxy-logs"}]},"spec":{"nodeName":"titan-18","volumes":[{"name":"secret"}]},"status":{"phase":"Running"}},` + - `{"metadata":{"namespace":"longhorn-system","name":"pvc-backed-failed","creationTimestamp":"` + old + `","deletionTimestamp":"` + old + `","ownerReferences":[{"kind":"ReplicaSet","name":"pvc-backed"}]},"spec":{"nodeName":"titan-12","volumes":[{"name":"data","persistentVolumeClaim":{"claimName":"data"}}]},"status":{"phase":"Failed"}},` + - `{"metadata":{"namespace":"longhorn-system","name":"longhorn-vault-sync-fresh","creationTimestamp":"` + recent + `","ownerReferences":[{"kind":"ReplicaSet","name":"longhorn-vault-sync"}]},"spec":{"nodeName":"titan-12"},"status":{"phase":"Unknown"}},` + - `{"metadata":{"namespace":"maintenance","name":"stale-on-bad-node","creationTimestamp":"` + old + `","ownerReferences":[{"kind":"ReplicaSet","name":"maintenance"}]},"spec":{"nodeName":"titan-22"},"status":{"phase":"Unknown"}},` + - `{"metadata":{"namespace":"default","name":"bare-pod","creationTimestamp":"` + old + `"},"spec":{"nodeName":"titan-12"},"status":{"phase":"Unknown"}}]}` - - deleted := []string{} - orch := buildOrchestratorWithStubs(t, config.Config{ - Startup: config.Startup{StuckPodGraceSeconds: 180}, - }, []commandStub{ - {match: matchContains("kubectl", "get", "pods", "-A", "-o", "json"), out: pods}, - {match: matchContains("kubectl", "-n", "longhorn-system", "get", "nodes.longhorn.io"), out: "titan-12\tTrue\ntitan-22\tTrue\n"}, - {match: matchContains("kubectl", "get", "events", "-A", "-o", "json"), out: `{"items":[]}`}, - {match: matchContains("kubectl", "get", "nodes", "-o", "json"), out: `{"items":[{"metadata":{"name":"titan-12"},"status":{"conditions":[{"type":"Ready","status":"True"}]}},{"metadata":{"name":"titan-22"},"status":{"conditions":[{"type":"Ready","status":"False"}]}}]}`}, - { - match: func(name string, args []string) bool { - if !matchContains("kubectl", "-n", "longhorn-system", "delete", "pod", "longhorn-vault-sync-old", "--wait=false")(name, args) { - return false - } - deleted = append(deleted, "longhorn-vault-sync-old") - return true - }, - }, - { - match: func(name string, args []string) bool { - if !matchContains("kubectl", "-n", "longhorn-system", "delete", "pod", "longhorn-vault-sync-failed", "--wait=false", "--grace-period=0", "--force")(name, args) { - return false - } - deleted = append(deleted, "longhorn-vault-sync-failed") - return true - }, - }, - { - match: func(name string, args []string) bool { - if !matchContains("kubectl", "-n", "logging", "delete", "pod", "oauth2-proxy-terminating", "--wait=false", "--grace-period=0", "--force")(name, args) { - return false - } - deleted = append(deleted, "oauth2-proxy-terminating") - return true - }, - }, - { - match: func(name string, args []string) bool { - if !matchContains("kubectl", "-n", "longhorn-system", "delete", "pod", "pvc-backed-failed", "--wait=false")(name, args) { - return false - } - if strings.Contains(strings.Join(args, " "), "--force") { - t.Fatalf("pvc-backed stale pod must not be force deleted") - } - deleted = append(deleted, "pvc-backed-failed") - return true - }, - }, - }) - - if err := orch.recycleStuckControllerPods(context.Background()); err != nil { - t.Fatalf("recycleStuckControllerPods failed: %v", err) - } - if strings.Join(deleted, ",") != "longhorn-vault-sync-old,longhorn-vault-sync-failed,oauth2-proxy-terminating,pvc-backed-failed" { - t.Fatalf("expected only stale controller pods on Ready node to be recycled, got %#v", deleted) - } -} - -// TestRecycleStuckControllerPodsCordonsContainerRuntimeWedgeNode runs one orchestration or CLI step. -// Signature: TestRecycleStuckControllerPodsCordonsContainerRuntimeWedgeNode(t *testing.T). -// Why: a Ready node with a wedged container runtime can trap replacement pods -// indefinitely; startup should cordon that scheduler target without draining it -// or touching Longhorn data-plane objects. -func TestRecycleStuckControllerPodsCordonsContainerRuntimeWedgeNode(t *testing.T) { - old := time.Now().Add(-10 * time.Minute).UTC().Format(time.RFC3339) - lastSeen := time.Now().UTC().Format(time.RFC3339) - pods := `{"items":[` + - `{"metadata":{"namespace":"logging","name":"oauth2-proxy-bad","creationTimestamp":"` + old + `","ownerReferences":[{"kind":"ReplicaSet","name":"oauth2-proxy"}]},"spec":{"nodeName":"titan-18","volumes":[{"name":"scratch"}]},"status":{"phase":"Pending","containerStatuses":[{"name":"oauth2-proxy","state":{"waiting":{"reason":"CreateContainerError"}}}]}},` + - `{"metadata":{"namespace":"monitoring","name":"suite-probe-bad","creationTimestamp":"` + old + `","ownerReferences":[{"kind":"Job","name":"suite-probe"}]},"spec":{"nodeName":"titan-18","volumes":[{"name":"scratch"}]},"status":{"phase":"Pending","containerStatuses":[{"name":"probe","state":{"waiting":{"reason":"CreateContainerError"}}}]}},` + - `{"metadata":{"namespace":"sso","name":"secret-ensure-bad","creationTimestamp":"` + old + `","ownerReferences":[{"kind":"Job","name":"secret-ensure"}]},"spec":{"nodeName":"titan-18","volumes":[{"name":"scratch"}]},"status":{"phase":"Pending","initContainerStatuses":[{"name":"init","state":{"waiting":{"reason":"CreateContainerError"}}}]}},` + - `{"metadata":{"namespace":"finance","name":"single-node-bad","creationTimestamp":"` + old + `","ownerReferences":[{"kind":"ReplicaSet","name":"single"}]},"spec":{"nodeName":"titan-19","volumes":[{"name":"scratch"}]},"status":{"phase":"Pending","containerStatuses":[{"name":"app","state":{"waiting":{"reason":"CreateContainerError"}}}]}}]}` - events := `{"items":[` + - `{"metadata":{"namespace":"logging","creationTimestamp":"` + lastSeen + `"},"involvedObject":{"kind":"Pod","namespace":"logging","name":"oauth2-proxy-bad"},"type":"Warning","reason":"Failed","message":"spec.containers{oauth2-proxy}: Error: failed to reserve container name oauth2-proxy_logging","lastTimestamp":"` + lastSeen + `"},` + - `{"metadata":{"namespace":"monitoring","creationTimestamp":"` + lastSeen + `"},"involvedObject":{"kind":"Pod","namespace":"monitoring","name":"suite-probe-bad"},"type":"Warning","reason":"Failed","message":"spec.containers{probe}: Error: context deadline exceeded","lastTimestamp":"` + lastSeen + `"},` + - `{"metadata":{"namespace":"sso","creationTimestamp":"` + lastSeen + `"},"involvedObject":{"kind":"Pod","namespace":"sso","name":"secret-ensure-bad"},"type":"Warning","reason":"Failed","message":"spec.initContainers{init}: Error: failed to reserve container name init_sso","lastTimestamp":"` + lastSeen + `"},` + - `{"metadata":{"namespace":"finance","creationTimestamp":"` + lastSeen + `"},"involvedObject":{"kind":"Pod","namespace":"finance","name":"single-node-bad"},"type":"Warning","reason":"Failed","message":"spec.containers{app}: Error: failed to reserve container name app_finance","lastTimestamp":"` + lastSeen + `"}]}` - - cordoned := []string{} - deleted := []string{} - orch := buildOrchestratorWithStubs(t, config.Config{ - Startup: config.Startup{StuckPodGraceSeconds: 180}, - }, []commandStub{ - {match: matchContains("kubectl", "get", "pods", "-A", "-o", "json"), out: pods}, - {match: matchContains("kubectl", "-n", "longhorn-system", "get", "nodes.longhorn.io"), out: ""}, - {match: matchContains("kubectl", "get", "events", "-A", "-o", "json"), out: events}, - {match: matchContains("kubectl", "get", "nodes", "-o", "json"), out: `{"items":[{"metadata":{"name":"titan-18"},"status":{"conditions":[{"type":"Ready","status":"True"}]}},{"metadata":{"name":"titan-19"},"status":{"conditions":[{"type":"Ready","status":"True"}]}}]}`}, - { - match: func(name string, args []string) bool { - if !matchContains("kubectl", "cordon")(name, args) { - return false - } - cordoned = append(cordoned, args[len(args)-1]) - return true - }, - }, - { - match: func(name string, args []string) bool { - if !matchContains("kubectl", "delete", "pod", "--wait=false")(name, args) { - return false - } - joined := strings.Join(args, " ") - if strings.Contains(joined, "--force") { - t.Fatalf("container-runtime wedge recycle must not force-delete fresh pods") - } - if len(args) >= 5 { - deleted = append(deleted, args[4]) - } - return true - }, - }, - }) - - if err := orch.recycleStuckControllerPods(context.Background()); err != nil { - t.Fatalf("recycleStuckControllerPods failed: %v", err) - } - if strings.Join(cordoned, ",") != "titan-18" { - t.Fatalf("expected only titan-18 to be cordoned, got %#v", cordoned) - } - if strings.Join(deleted, ",") != "oauth2-proxy-bad,suite-probe-bad,secret-ensure-bad,single-node-bad" { - t.Fatalf("expected runtime-wedged pods to be recycled, got %#v", deleted) - } -} - -// TestEffectiveWorkersFiltersIgnoredUnavailableNodes runs one orchestration or CLI step. -// Signature: TestEffectiveWorkersFiltersIgnoredUnavailableNodes(t *testing.T). -// Why: ignored unavailable nodes should be excluded before startup tries SSH, -// k3s-agent start, or uncordon operations against intentionally absent hosts. -func TestEffectiveWorkersFiltersIgnoredUnavailableNodes(t *testing.T) { - cfg := config.Config{ - Workers: []string{" titan-08 ", "titan-09", "titan-10", "titan-11"}, - Startup: config.Startup{ - IgnoreUnavailableNodes: []string{"titan-09", "titan-10"}, - }, - } - orch := buildOrchestratorWithStubs(t, cfg, nil) - got, err := orch.effectiveWorkers(context.Background()) - if err != nil { - t.Fatalf("effectiveWorkers failed: %v", err) - } - want := []string{"titan-08", "titan-11"} - if strings.Join(got, ",") != strings.Join(want, ",") { - t.Fatalf("effectiveWorkers mismatch got=%v want=%v", got, want) - } -} - -// TestEnsureLonghornEncryptedHostPrereqsFiltersUnsafeWorkers runs one orchestration or CLI step. -// Signature: TestEnsureLonghornEncryptedHostPrereqsFiltersUnsafeWorkers(t *testing.T). -// Why: startup must not uncordon Longhorn workers that cannot mount encrypted -// PVCs; cordoning those nodes is safe and avoids repeating the post-outage -// mount deadlock. -func TestEnsureLonghornEncryptedHostPrereqsFiltersUnsafeWorkers(t *testing.T) { - cordoned := []string{} - orch := buildOrchestratorWithStubs(t, config.Config{ - SSHManagedNodes: []string{"titan-04", "titan-19"}, - }, []commandStub{ - {match: matchContains("kubectl", "get", "nodes", "-l", "longhorn-host=true"), out: "titan-04\ntitan-19\ntitan-23\n"}, - { - match: matchContains("ssh", "titan-04", "command -v cryptsetup"), - out: "__ANANKE_CRYPTSETUP_PRESENT__", - }, - { - match: matchContains("ssh", "titan-19", "apt-get install -y --no-install-recommends cryptsetup-bin"), - err: errors.New("sudo: a password is required"), - }, - { - match: func(name string, args []string) bool { - if name != "kubectl" || len(args) == 0 || args[0] != "cordon" { - return false - } - if len(args) > 1 { - cordoned = append(cordoned, args[len(args)-1]) - } - return true - }, - }, - }) - - got, err := orch.ensureLonghornEncryptedHostPrereqs(context.Background(), []string{"titan-04", "titan-19", "titan-20"}) - if err != nil { - t.Fatalf("ensureLonghornEncryptedHostPrereqs failed: %v", err) - } - want := []string{"titan-04", "titan-20"} - if strings.Join(got, ",") != strings.Join(want, ",") { - t.Fatalf("guarded workers mismatch got=%v want=%v", got, want) - } - if strings.Join(cordoned, ",") != "titan-19,titan-23" { - t.Fatalf("expected unsafe longhorn hosts to be cordoned, got %v", cordoned) - } -} - -// TestLonghornCryptsetupExemptNodesAreNotQuarantined runs one orchestration or CLI step. -// Signature: TestLonghornCryptsetupExemptNodesAreNotQuarantined(t *testing.T). -// Why: Veles/Oceanus uses titan-23 as a Longhorn host for unencrypted local -// volumes; startup should uncordon that policy-exempt node without requiring -// host SSH or weakening encrypted-volume safety on other workers. -func TestLonghornCryptsetupExemptNodesAreNotQuarantined(t *testing.T) { - cordoned := []string{} - uncordoned := []string{} - sshTitan23 := false - orch := buildOrchestratorWithStubs(t, config.Config{ - SSHManagedNodes: []string{"titan-04"}, - Startup: config.Startup{ - LonghornCryptsetupExemptNodes: []string{"titan-23"}, - }, - }, []commandStub{ - {match: matchContains("kubectl", "get", "nodes", "-l", "longhorn-host=true"), out: "titan-04\ntitan-23\n"}, - { - match: matchContains("ssh", "titan-04", "command -v cryptsetup"), - out: "__ANANKE_CRYPTSETUP_PRESENT__", - }, - { - match: func(name string, args []string) bool { - if name == "ssh" && strings.Contains(strings.Join(args, " "), "titan-23") { - sshTitan23 = true - return true - } - return false - }, - }, - { - match: func(name string, args []string) bool { - if name != "kubectl" || len(args) == 0 || args[0] != "cordon" { - return false - } - if len(args) > 1 { - cordoned = append(cordoned, args[len(args)-1]) - } - return true - }, - }, - { - match: func(name string, args []string) bool { - if !matchContains("kubectl", "uncordon")(name, args) { - return false - } - if len(args) > 1 { - uncordoned = append(uncordoned, args[len(args)-1]) - } - return true - }, - }, - }) - - got, err := orch.ensureLonghornEncryptedHostPrereqs(context.Background(), []string{"titan-04"}) - if err != nil { - t.Fatalf("ensureLonghornEncryptedHostPrereqs failed: %v", err) - } - if strings.Join(got, ",") != "titan-04" { - t.Fatalf("guarded workers mismatch got=%v", got) - } - if err := orch.uncordonLonghornCryptsetupExemptNodes(context.Background()); err != nil { - t.Fatalf("uncordonLonghornCryptsetupExemptNodes failed: %v", err) - } - if sshTitan23 { - t.Fatalf("did not expect cryptsetup SSH check for exempt titan-23") - } - if len(cordoned) != 0 { - t.Fatalf("did not expect exempt node to be cordoned, got %v", cordoned) - } - if strings.Join(uncordoned, ",") != "titan-23" { - t.Fatalf("expected exempt titan-23 to be uncordoned, got %v", uncordoned) - } -} - -// TestLonghornHostNodesFallsBackToConfiguredLabels runs one orchestration or CLI step. -// Signature: TestLonghornHostNodesFallsBackToConfiguredLabels(t *testing.T). -// Why: bootstrap caches or minimal test clusters can lack live labels; the -// static startup inventory should still protect configured storage workers. -func TestLonghornHostNodesFallsBackToConfiguredLabels(t *testing.T) { - orch := buildOrchestratorWithStubs(t, config.Config{ - Startup: config.Startup{ - RequiredNodeLabels: map[string]map[string]string{ - "titan-04": {"longhorn-host": "true"}, - "titan-20": {"node-role.kubernetes.io/worker": "true"}, - }, - }, - }, []commandStub{ - {match: matchContains("kubectl", "get", "nodes", "-l", "longhorn-host=true"), out: ""}, - }) - - got, err := orch.longhornHostNodes(context.Background()) - if err != nil { - t.Fatalf("longhornHostNodes failed: %v", err) - } - if _, ok := got["titan-04"]; !ok || len(got) != 1 { - t.Fatalf("expected configured longhorn host fallback, got %v", got) - } -} - // TestNewConstructsOrchestrator runs one orchestration or CLI step. // Signature: TestNewConstructsOrchestrator(t *testing.T). // Why: covers constructor path in orchestrator core module. @@ -913,86 +466,3 @@ func TestRunSudoK3SFailsWhenAllCandidatesFail(t *testing.T) { t.Fatalf("expected runSudoK3S failure when all candidates fail") } } - -// TestCriticalEndpointHelpers runs one orchestration or CLI step. -// Signature: TestCriticalEndpointHelpers(t *testing.T). -// Why: covers critical endpoint parsing and readiness checks that gate startup completion. -func TestCriticalEndpointHelpers(t *testing.T) { - cfg := config.Config{ - Startup: config.Startup{ - CriticalServiceEndpoints: []string{"monitoring/victoria-metrics-single-server"}, - }, - } - orch := buildOrchestratorWithStubs(t, cfg, []commandStub{ - {match: matchContains("kubectl", "get endpoints victoria-metrics-single-server"), out: "10.42.0.10\n10.42.0.11\n"}, - }) - ok, detail, ns, svc, err := orch.criticalServiceEndpointsReady(context.Background()) - if err != nil || !ok { - t.Fatalf("expected criticalServiceEndpointsReady success, got ok=%v detail=%q ns=%q svc=%q err=%v", ok, detail, ns, svc, err) - } - if detail != "services=1" { - t.Fatalf("unexpected readiness detail: %q", detail) - } - gotNS, gotSvc, err := parseCriticalServiceEndpoint("monitoring/victoria-metrics-single-server") - if err != nil || gotNS != "monitoring" || gotSvc != "victoria-metrics-single-server" { - t.Fatalf("unexpected parse result ns=%q svc=%q err=%v", gotNS, gotSvc, err) - } - if _, _, err := parseCriticalServiceEndpoint("invalid"); err == nil { - t.Fatalf("expected parseCriticalServiceEndpoint error") - } -} - -// TestCriticalEndpointAutoHealWorkflow runs one orchestration or CLI step. -// Signature: TestCriticalEndpointAutoHealWorkflow(t *testing.T). -// Why: covers endpoint-zero recovery where startup heals workload replicas before succeeding. -func TestCriticalEndpointAutoHealWorkflow(t *testing.T) { - cfg := config.Config{ - Startup: config.Startup{ - CriticalServiceEndpointWaitSec: 2, - CriticalServiceEndpointPollSec: 1, - CriticalServiceEndpoints: []string{"monitoring/victoria-metrics-single-server"}, - }, - State: config.State{ - Dir: t.TempDir(), - ReportsDir: filepath.Join(t.TempDir(), "reports"), - RunHistoryPath: filepath.Join(t.TempDir(), "runs.json"), - }, - } - orch := &Orchestrator{ - cfg: cfg, - runner: &execx.Runner{}, - store: state.New(cfg.State.RunHistoryPath), - log: log.New(io.Discard, "", 0), - } - - endpointChecks := 0 - dispatch := func(_ context.Context, _ time.Duration, name string, args ...string) (string, error) { - joined := name + " " + strings.Join(args, " ") - if strings.Contains(joined, "get endpoints victoria-metrics-single-server") { - endpointChecks++ - if endpointChecks == 1 { - return "", nil - } - return "10.42.0.10\n", nil - } - if strings.Contains(joined, "scale deployment victoria-metrics-single-server") { - return "", errors.New(`Error from server (NotFound): deployments.apps "victoria-metrics-single-server" not found`) - } - if strings.Contains(joined, "scale statefulset victoria-metrics-single-server") { - return "", nil - } - if strings.Contains(joined, "rollout status statefulset/victoria-metrics-single-server") { - return "statefulset rolled out", nil - } - return "", nil - } - orch.runOverride = dispatch - orch.runSensitiveOverride = dispatch - - if err := orch.waitForCriticalServiceEndpoints(context.Background()); err != nil { - t.Fatalf("waitForCriticalServiceEndpoints failed: %v", err) - } - if endpointChecks < 2 { - t.Fatalf("expected repeated endpoint checks, got %d", endpointChecks) - } -} diff --git a/internal/cluster/orchestrator_workload_convergence.go b/internal/cluster/orchestrator_workload_convergence.go index 01c3d9d..8281a77 100644 --- a/internal/cluster/orchestrator_workload_convergence.go +++ b/internal/cluster/orchestrator_workload_convergence.go @@ -417,7 +417,8 @@ func (o *Orchestrator) quarantineContainerRuntimeWedgeNodes(ctx context.Context, continue } sort.Strings(keys) - if _, err := o.kubectl(ctx, 30*time.Second, "cordon", node); err != nil { + detail := fmt.Sprintf("pods=%d %s", len(keys), joinLimited(keys, 8)) + if err := o.cordonNodeWithLease(ctx, node, cordonReasonRuntimeWedge, detail); err != nil { o.log.Printf("warning: cordon container-runtime-wedged node %s failed: %v", node, err) continue } @@ -438,380 +439,3 @@ func (o *Orchestrator) quarantineContainerRuntimeWedgeNodes(ctx context.Context, } return nodes } - -// staleControllerPodReasons runs one orchestration or CLI step. -// Signature: (o *Orchestrator) staleControllerPodReasons(ctx context.Context, pods podList, grace time.Duration) (map[string]string, error). -// Why: after node or kubelet recovery, controller-owned pods can stay in -// terminal or unknown status even though the node is Ready and a replacement may -// already be healthy. A normal pod delete lets Kubernetes clean the stale status -// without touching storage objects or forcing deletion on a partitioned node. -func (o *Orchestrator) staleControllerPodReasons(ctx context.Context, pods podList, grace time.Duration) (map[string]string, error) { - unavailable, err := o.unavailableNodeSet(ctx) - if err != nil { - return nil, err - } - reasons := map[string]string{} - for _, pod := range pods.Items { - ns := strings.TrimSpace(pod.Metadata.Namespace) - name := strings.TrimSpace(pod.Metadata.Name) - node := strings.TrimSpace(pod.Spec.NodeName) - if ns == "" || name == "" || node == "" { - continue - } - phase := strings.TrimSpace(pod.Status.Phase) - if !strings.EqualFold(phase, "Unknown") && !strings.EqualFold(phase, "Failed") { - continue - } - if _, badNode := unavailable[node]; badNode { - continue - } - if !podControllerOwned(pod) { - continue - } - if !pod.Metadata.CreationTimestamp.IsZero() && time.Since(pod.Metadata.CreationTimestamp) < grace { - continue - } - reasons[ns+"/"+name] = "StaleControllerPodOnReadyNode:" + node + ":" + phase - } - return reasons, nil -} - -// staleControllerPodForceDeleteSafe runs one orchestration or CLI step. -// Signature: staleControllerPodForceDeleteSafe(pod podResource, grace time.Duration) bool. -// Why: a stale pod already marked for deletion may need force removal after a -// node outage. Keep that fallback away from PVC-bearing pods so Ananke never -// risks duplicating a storage writer. -func staleControllerPodForceDeleteSafe(pod podResource, grace time.Duration) bool { - if pod.Metadata.DeletionTimestamp == nil { - return false - } - if time.Since(*pod.Metadata.DeletionTimestamp) < grace { - return false - } - if podUsesPersistentVolumeClaim(pod) { - return false - } - return true -} - -// podUsesPersistentVolumeClaim runs one orchestration or CLI step. -// Signature: podUsesPersistentVolumeClaim(pod podResource) bool. -// Why: force-delete recovery is deliberately disallowed for pods with PVCs; the -// scheduler and storage controller need to settle those normally. -func podUsesPersistentVolumeClaim(pod podResource) bool { - for _, volume := range pod.Spec.Volumes { - if volume.PersistentVolumeClaim != nil && strings.TrimSpace(volume.PersistentVolumeClaim.ClaimName) != "" { - return true - } - } - return false -} - -// repairEncryptedVolumeMountPrereqs runs one orchestration or CLI step. -// Signature: (o *Orchestrator) repairEncryptedVolumeMountPrereqs(ctx context.Context, pods podList, grace time.Duration) (map[string]string, error). -// Why: encrypted Longhorn volume mounts depend on host cryptsetup. After node -// rebuilds or partial OS recovery, Kubernetes may be ready while kubelet cannot -// mount encrypted PVCs; installing the missing host tool and recycling the -// controller-owned pod lets kubelet retry the same volume safely. -func (o *Orchestrator) repairEncryptedVolumeMountPrereqs(ctx context.Context, pods podList, grace time.Duration) (map[string]string, error) { - eventsOut, err := o.kubectl(ctx, 30*time.Second, "get", "events", "-A", "-o", "json") - if err != nil { - return nil, fmt.Errorf("query events for encrypted volume mount scan: %w", err) - } - var events eventList - if err := json.Unmarshal([]byte(eventsOut), &events); err != nil { - return nil, fmt.Errorf("decode events for encrypted volume mount scan: %w", err) - } - - podsByKey := map[string]podResource{} - for _, pod := range pods.Items { - ns := strings.TrimSpace(pod.Metadata.Namespace) - name := strings.TrimSpace(pod.Metadata.Name) - node := strings.TrimSpace(pod.Spec.NodeName) - if ns == "" || name == "" || node == "" { - continue - } - if !strings.EqualFold(strings.TrimSpace(pod.Status.Phase), "Pending") { - continue - } - if !podControllerOwned(pod) { - continue - } - if !pod.Metadata.CreationTimestamp.IsZero() && time.Since(pod.Metadata.CreationTimestamp) < grace { - continue - } - podsByKey[ns+"/"+name] = pod - } - if len(podsByKey) == 0 { - return map[string]string{}, nil - } - - repairedNodes := map[string]bool{} - reasons := map[string]string{} - for _, event := range events.Items { - if !strings.EqualFold(strings.TrimSpace(event.Type), "Warning") { - continue - } - if strings.TrimSpace(event.Reason) != "FailedMount" { - continue - } - if !strings.EqualFold(strings.TrimSpace(event.InvolvedObject.Kind), "Pod") { - continue - } - key := strings.TrimSpace(event.InvolvedObject.Namespace) + "/" + strings.TrimSpace(event.InvolvedObject.Name) - pod, ok := podsByKey[key] - if !ok { - continue - } - lastSeen := eventLastObservedAt(event) - if !lastSeen.IsZero() && !pod.Metadata.CreationTimestamp.IsZero() && lastSeen.Before(pod.Metadata.CreationTimestamp) { - continue - } - message := strings.ToLower(strings.TrimSpace(event.Message)) - if !strings.Contains(message, "cryptsetup") || !strings.Contains(message, "no such file or directory") { - continue - } - node := strings.TrimSpace(pod.Spec.NodeName) - if node == "" || !o.sshManaged(node) { - o.log.Printf("warning: encrypted volume mount blocked on unmanaged node %s for pod %s", node, key) - continue - } - if repaired, ok := repairedNodes[node]; ok { - if repaired { - reasons[key] = "EncryptedVolumeCryptsetupRepaired:" + node - } - continue - } - if err := o.ensureHostCryptsetup(ctx, node); err != nil { - repairedNodes[node] = false - o.log.Printf("warning: cryptsetup prerequisite repair failed on %s for pod %s: %v", node, key, err) - if cordonErr := o.cordonNodeForMissingCryptsetup(ctx, node); cordonErr != nil { - o.log.Printf("warning: cordon failed after cryptsetup repair failure on %s for pod %s: %v", node, key, cordonErr) - continue - } - reasons[key] = "EncryptedVolumeCryptsetupNodeCordoned:" + node - continue - } - repairedNodes[node] = true - reasons[key] = "EncryptedVolumeCryptsetupRepaired:" + node - } - return reasons, nil -} - -// ensureHostCryptsetup runs one orchestration or CLI step. -// Signature: (o *Orchestrator) ensureHostCryptsetup(ctx context.Context, node string) error. -// Why: kubelet's encrypted Longhorn mount helper shells into the host namespace, -// so the package must exist on the node host, not merely inside a workload pod. -func (o *Orchestrator) ensureHostCryptsetup(ctx context.Context, node string) error { - command := strings.Join([]string{ - "set -eu", - "if command -v cryptsetup >/dev/null 2>&1 || [ -x /usr/sbin/cryptsetup ] || [ -x /usr/bin/cryptsetup ]; then echo __ANANKE_CRYPTSETUP_PRESENT__; exit 0; fi", - "if ! command -v apt-get >/dev/null 2>&1; then echo __ANANKE_CRYPTSETUP_NO_APT__; exit 42; fi", - "sudo -n env DEBIAN_FRONTEND=noninteractive apt-get update", - "sudo -n env DEBIAN_FRONTEND=noninteractive apt-get install -y --no-install-recommends cryptsetup-bin", - "if command -v cryptsetup >/dev/null 2>&1 || [ -x /usr/sbin/cryptsetup ] || [ -x /usr/bin/cryptsetup ]; then echo __ANANKE_CRYPTSETUP_INSTALLED__; exit 0; fi", - "echo __ANANKE_CRYPTSETUP_INSTALL_FAILED__", - "exit 43", - }, "; ") - out, err := o.sshWithTimeout(ctx, node, command, 5*time.Minute) - if err != nil { - return fmt.Errorf("install cryptsetup-bin: %w (output=%s)", err, strings.TrimSpace(out)) - } - trimmed := strings.TrimSpace(out) - o.log.Printf("ensured cryptsetup prerequisite on %s: %s", node, trimmed) - if strings.Contains(trimmed, "__ANANKE_CRYPTSETUP_INSTALLED__") { - o.noteStartupAutoHeal(fmt.Sprintf("installed cryptsetup on %s", node)) - } - return nil -} - -// cordonNodeForMissingCryptsetup runs one orchestration or CLI step. -// Signature: (o *Orchestrator) cordonNodeForMissingCryptsetup(ctx context.Context, node string) error. -// Why: when host package repair is not permitted, cordoning is the safest -// automatic fallback: it prevents new encrypted-volume pods from landing on a -// node kubelet cannot mount from, while leaving existing workloads and storage -// objects untouched. -func (o *Orchestrator) cordonNodeForMissingCryptsetup(ctx context.Context, node string) error { - if _, err := o.kubectl(ctx, 30*time.Second, "cordon", node); err != nil { - return err - } - o.log.Printf("cordoned node %s after encrypted volume cryptsetup prerequisite failure", node) - o.noteStartupAutoHeal(fmt.Sprintf("cordoned %s after missing cryptsetup blocked encrypted volume mount", node)) - return nil -} - -// longhornAttachBlockedPodReasons runs one orchestration or CLI step. -// Signature: (o *Orchestrator) longhornAttachBlockedPodReasons(ctx context.Context, pods podList, grace time.Duration) (map[string]string, error). -// Why: after a power event, Kubernetes can schedule a Longhorn-backed pod onto a -// node Longhorn still marks unready. Recycling the unattached Pending pod lets -// the scheduler pick a Longhorn-ready node without touching Longhorn data-plane -// objects. -func (o *Orchestrator) longhornAttachBlockedPodReasons(ctx context.Context, pods podList, grace time.Duration) (map[string]string, error) { - unreadyNodes, err := o.longhornUnreadyNodes(ctx) - if err != nil { - return nil, err - } - if len(unreadyNodes) == 0 { - return map[string]string{}, nil - } - - eventsOut, err := o.kubectl(ctx, 30*time.Second, "get", "events", "-A", "-o", "json") - if err != nil { - return nil, fmt.Errorf("query events for longhorn attach-blocked pod scan: %w", err) - } - var events eventList - if err := json.Unmarshal([]byte(eventsOut), &events); err != nil { - return nil, fmt.Errorf("decode events for longhorn attach-blocked pod scan: %w", err) - } - - podsByKey := map[string]podResource{} - for _, pod := range pods.Items { - ns := strings.TrimSpace(pod.Metadata.Namespace) - name := strings.TrimSpace(pod.Metadata.Name) - node := strings.TrimSpace(pod.Spec.NodeName) - if ns == "" || name == "" || node == "" { - continue - } - if !strings.EqualFold(strings.TrimSpace(pod.Status.Phase), "Pending") { - continue - } - if _, unready := unreadyNodes[node]; !unready { - continue - } - if !podControllerOwned(pod) { - continue - } - if !pod.Metadata.CreationTimestamp.IsZero() && time.Since(pod.Metadata.CreationTimestamp) < grace { - continue - } - podsByKey[ns+"/"+name] = pod - } - if len(podsByKey) == 0 { - return map[string]string{}, nil - } - - reasons := map[string]string{} - for _, event := range events.Items { - if !strings.EqualFold(strings.TrimSpace(event.Type), "Warning") { - continue - } - if strings.TrimSpace(event.Reason) != "FailedAttachVolume" { - continue - } - if !strings.EqualFold(strings.TrimSpace(event.InvolvedObject.Kind), "Pod") { - continue - } - key := strings.TrimSpace(event.InvolvedObject.Namespace) + "/" + strings.TrimSpace(event.InvolvedObject.Name) - pod, ok := podsByKey[key] - if !ok { - continue - } - lastSeen := eventLastObservedAt(event) - if !lastSeen.IsZero() && !pod.Metadata.CreationTimestamp.IsZero() && lastSeen.Before(pod.Metadata.CreationTimestamp) { - continue - } - node := strings.TrimSpace(pod.Spec.NodeName) - message := strings.ToLower(strings.TrimSpace(event.Message)) - if !strings.Contains(message, "longhorn-backend") || !strings.Contains(message, "failed for volume") { - continue - } - if !strings.Contains(message, "node "+strings.ToLower(node)+" is not ready") { - continue - } - reasons[key] = "LonghornAttachBlockedOnUnreadyNode:" + node - } - return reasons, nil -} - -// longhornUnreadyNodes runs one orchestration or CLI step. -// Signature: (o *Orchestrator) longhornUnreadyNodes(ctx context.Context) (map[string]struct{}, error). -// Why: Longhorn node readiness can lag or intentionally differ from Kubernetes -// node readiness; attach recovery must use Longhorn's view for safety. -func (o *Orchestrator) longhornUnreadyNodes(ctx context.Context) (map[string]struct{}, error) { - out, err := o.kubectl(ctx, 30*time.Second, - "-n", "longhorn-system", - "get", "nodes.longhorn.io", - "-o", "jsonpath={range .items[*]}{.metadata.name}{'\\t'}{range .status.conditions[?(@.type==\"Ready\")]}{.status}{end}{'\\n'}{end}", - ) - if err != nil { - if isNotFoundErr(err) { - return map[string]struct{}{}, nil - } - return nil, fmt.Errorf("query longhorn node readiness: %w", err) - } - unready := map[string]struct{}{} - for _, line := range lines(out) { - fields := strings.Fields(line) - if len(fields) < 2 { - continue - } - if !strings.EqualFold(strings.TrimSpace(fields[1]), "True") { - unready[strings.TrimSpace(fields[0])] = struct{}{} - } - } - return unready, nil -} - -// podControllerOwned runs one orchestration or CLI step. -// Signature: podControllerOwned(p podResource) bool. -// Why: keeps behavior explicit so startup/shutdown workflows remain maintainable as services evolve. -func podControllerOwned(p podResource) bool { - for _, owner := range p.Metadata.OwnerReferences { - switch strings.TrimSpace(owner.Kind) { - case "ReplicaSet", "StatefulSet", "DaemonSet", "Job": - return true - } - } - return false -} - -// stuckContainerReason runs one orchestration or CLI step. -// Signature: stuckContainerReason(p podResource, reasons map[string]struct{}) string. -// Why: keeps behavior explicit so startup/shutdown workflows remain maintainable as services evolve. -func stuckContainerReason(p podResource, reasons map[string]struct{}) string { - check := func(statuses []podContainerStatus) string { - for _, st := range statuses { - if st.State.Waiting == nil { - continue - } - reason := strings.TrimSpace(st.State.Waiting.Reason) - if reason == "" { - continue - } - if _, ok := reasons[reason]; ok { - return reason - } - } - return "" - } - if reason := check(p.Status.InitContainerStatuses); reason != "" { - return reason - } - return check(p.Status.ContainerStatuses) -} - -// stuckVaultInitReason runs one orchestration or CLI step. -// Signature: stuckVaultInitReason(p podResource, grace time.Duration) string. -// Why: keeps behavior explicit so startup/shutdown workflows remain maintainable as services evolve. -func stuckVaultInitReason(p podResource, grace time.Duration) string { - if !strings.EqualFold(strings.TrimSpace(p.Status.Phase), "Pending") { - return "" - } - if !strings.EqualFold(strings.TrimSpace(p.Metadata.Annotations["vault.hashicorp.com/agent-inject"]), "true") { - return "" - } - for _, st := range p.Status.InitContainerStatuses { - if strings.TrimSpace(st.Name) != "vault-agent-init" || st.State.Running == nil { - continue - } - startedAt := st.State.Running.StartedAt - if startedAt.IsZero() { - continue - } - if time.Since(startedAt) < grace { - return "" - } - return "VaultInitStuck" - } - return "" -} diff --git a/internal/cluster/orchestrator_workload_recovery_test.go b/internal/cluster/orchestrator_workload_recovery_test.go new file mode 100644 index 0000000..fafd41f --- /dev/null +++ b/internal/cluster/orchestrator_workload_recovery_test.go @@ -0,0 +1,458 @@ +package cluster + +import ( + "context" + "errors" + "strings" + "testing" + "time" + + "scm.bstein.dev/bstein/ananke/internal/config" +) + +// TestRecycleStuckControllerPodsHandlesLonghornAttachBlockedPods runs one orchestration or CLI step. +// Signature: TestRecycleStuckControllerPodsHandlesLonghornAttachBlockedPods(t *testing.T). +// Why: Pending Longhorn-backed pods on Longhorn-unready nodes should be +// rescheduled without mutating Longhorn volume, replica, or disk objects. +func TestRecycleStuckControllerPodsHandlesLonghornAttachBlockedPods(t *testing.T) { + created := time.Now().Add(-10 * time.Minute).UTC().Format(time.RFC3339) + lastSeen := time.Now().UTC().Format(time.RFC3339) + pods := `{"items":[{"metadata":{"namespace":"monitoring","name":"victoria-metrics-single-server-0","creationTimestamp":"` + created + `","ownerReferences":[{"kind":"StatefulSet","name":"victoria-metrics-single-server"}]},"spec":{"nodeName":"titan-0b"},"status":{"phase":"Pending"}}]}` + events := `{"items":[{"metadata":{"namespace":"monitoring","creationTimestamp":"` + lastSeen + `"},"involvedObject":{"kind":"Pod","namespace":"monitoring","name":"victoria-metrics-single-server-0"},"type":"Warning","reason":"FailedAttachVolume","message":"AttachVolume.Attach failed for volume \"pvc-1\" : rpc error from [http://longhorn-backend:9500/v1/volumes/pvc-1?action=attach]: unable to attach volume pvc-1 to titan-0b: node titan-0b is not ready","lastTimestamp":"` + lastSeen + `"}]}` + + deleted := false + orch := buildOrchestratorWithStubs(t, config.Config{ + Startup: config.Startup{StuckPodGraceSeconds: 180}, + }, []commandStub{ + {match: matchContains("kubectl", "get", "pods", "-A", "-o", "json"), out: pods}, + {match: matchContains("kubectl", "-n", "longhorn-system", "get", "nodes.longhorn.io"), out: "titan-0b\tFalse\n"}, + {match: matchContains("kubectl", "get", "events", "-A", "-o", "json"), out: events}, + {match: matchContains("kubectl", "get", "nodes", "-o", "json"), out: `{"items":[{"metadata":{"name":"titan-0b"},"status":{"conditions":[{"type":"Ready","status":"True"}]}}]}`}, + { + match: func(name string, args []string) bool { + if !matchContains("kubectl", "-n", "monitoring", "delete", "pod", "victoria-metrics-single-server-0", "--wait=false")(name, args) { + return false + } + deleted = true + return true + }, + }, + }) + + if err := orch.recycleStuckControllerPods(context.Background()); err != nil { + t.Fatalf("recycleStuckControllerPods failed: %v", err) + } + if !deleted { + t.Fatalf("expected longhorn attach-blocked pending pod to be recycled") + } +} + +// TestRecycleStuckControllerPodsRepairsEncryptedVolumeCryptsetup runs one orchestration or CLI step. +// Signature: TestRecycleStuckControllerPodsRepairsEncryptedVolumeCryptsetup(t *testing.T). +// Why: encrypted Longhorn PVC recovery should repair missing host cryptsetup and +// then recycle the blocked pod without touching Longhorn data-plane objects. +func TestRecycleStuckControllerPodsRepairsEncryptedVolumeCryptsetup(t *testing.T) { + created := time.Now().Add(-10 * time.Minute).UTC().Format(time.RFC3339) + lastSeen := time.Now().UTC().Format(time.RFC3339) + pods := `{"items":[{"metadata":{"namespace":"finance","name":"actual-budget-abc","creationTimestamp":"` + created + `","ownerReferences":[{"kind":"ReplicaSet","name":"actual-budget"}]},"spec":{"nodeName":"titan-19"},"status":{"phase":"Pending"}}]}` + events := `{"items":[{"metadata":{"namespace":"finance","creationTimestamp":"` + lastSeen + `"},"involvedObject":{"kind":"Pod","namespace":"finance","name":"actual-budget-abc"},"type":"Warning","reason":"FailedMount","message":"MountVolume.MountDevice failed for volume \"pvc-1\" : nsenter: failed to execute cryptsetup: No such file or directory","lastTimestamp":"` + lastSeen + `"}]}` + + installed := false + deleted := false + orch := buildOrchestratorWithStubs(t, config.Config{ + Startup: config.Startup{StuckPodGraceSeconds: 180}, + }, []commandStub{ + {match: matchContains("kubectl", "get", "pods", "-A", "-o", "json"), out: pods}, + {match: matchContains("kubectl", "-n", "longhorn-system", "get", "nodes.longhorn.io"), out: "titan-19\tTrue\n"}, + {match: matchContains("kubectl", "get", "events", "-A", "-o", "json"), out: events}, + {match: matchContains("kubectl", "get", "nodes", "-o", "json"), out: `{"items":[{"metadata":{"name":"titan-19"},"status":{"conditions":[{"type":"Ready","status":"True"}]}}]}`}, + { + match: func(name string, args []string) bool { + if name != "ssh" || !strings.Contains(strings.Join(args, " "), "apt-get install -y --no-install-recommends cryptsetup-bin") { + return false + } + installed = true + return true + }, + out: "__ANANKE_CRYPTSETUP_INSTALLED__", + }, + { + match: func(name string, args []string) bool { + if !matchContains("kubectl", "-n", "finance", "delete", "pod", "actual-budget-abc", "--wait=false")(name, args) { + return false + } + deleted = true + return true + }, + }, + }) + + if err := orch.recycleStuckControllerPods(context.Background()); err != nil { + t.Fatalf("recycleStuckControllerPods failed: %v", err) + } + if !installed { + t.Fatalf("expected missing host cryptsetup to be installed") + } + if !deleted { + t.Fatalf("expected encrypted-volume blocked pod to be recycled") + } +} + +// TestRecycleStuckControllerPodsCordonsEncryptedVolumeNodeWhenRepairFails runs one orchestration or CLI step. +// Signature: TestRecycleStuckControllerPodsCordonsEncryptedVolumeNodeWhenRepairFails(t *testing.T). +// Why: when host package repair is blocked by sudo policy, Ananke should avoid +// the bad node and retry the controller-owned pod elsewhere. +func TestRecycleStuckControllerPodsCordonsEncryptedVolumeNodeWhenRepairFails(t *testing.T) { + created := time.Now().Add(-10 * time.Minute).UTC().Format(time.RFC3339) + lastSeen := time.Now().UTC().Format(time.RFC3339) + pods := `{"items":[{"metadata":{"namespace":"finance","name":"actual-budget-abc","creationTimestamp":"` + created + `","ownerReferences":[{"kind":"ReplicaSet","name":"actual-budget"}]},"spec":{"nodeName":"titan-19"},"status":{"phase":"Pending"}}]}` + events := `{"items":[{"metadata":{"namespace":"finance","creationTimestamp":"` + lastSeen + `"},"involvedObject":{"kind":"Pod","namespace":"finance","name":"actual-budget-abc"},"type":"Warning","reason":"FailedMount","message":"MountVolume.MountDevice failed for volume \"pvc-1\" : nsenter: failed to execute cryptsetup: No such file or directory","lastTimestamp":"` + lastSeen + `"}]}` + + cordoned := false + deleted := false + orch := buildOrchestratorWithStubs(t, config.Config{ + Startup: config.Startup{StuckPodGraceSeconds: 180}, + }, []commandStub{ + {match: matchContains("kubectl", "get", "pods", "-A", "-o", "json"), out: pods}, + {match: matchContains("kubectl", "-n", "longhorn-system", "get", "nodes.longhorn.io"), out: "titan-19\tTrue\n"}, + {match: matchContains("kubectl", "get", "events", "-A", "-o", "json"), out: events}, + {match: matchContains("kubectl", "get", "nodes", "-o", "json"), out: `{"items":[{"metadata":{"name":"titan-19"},"status":{"conditions":[{"type":"Ready","status":"True"}]}}]}`}, + { + match: matchContains("ssh", "apt-get install -y --no-install-recommends cryptsetup-bin"), + err: errors.New("sudo: a password is required"), + }, + { + match: func(name string, args []string) bool { + if !matchContains("kubectl", "cordon", "titan-19")(name, args) { + return false + } + cordoned = true + return true + }, + }, + { + match: func(name string, args []string) bool { + if !matchContains("kubectl", "-n", "finance", "delete", "pod", "actual-budget-abc", "--wait=false")(name, args) { + return false + } + deleted = true + return true + }, + }, + }) + + if err := orch.recycleStuckControllerPods(context.Background()); err != nil { + t.Fatalf("recycleStuckControllerPods failed: %v", err) + } + if !cordoned { + t.Fatalf("expected cryptsetup-missing node to be cordoned") + } + if !deleted { + t.Fatalf("expected encrypted-volume blocked pod to be recycled") + } +} + +// TestRecycleStuckControllerPodsHandlesStalePodsOnReadyNodes runs one orchestration or CLI step. +// Signature: TestRecycleStuckControllerPodsHandlesStalePodsOnReadyNodes(t *testing.T). +// Why: post-outage controller pods can remain Unknown or Failed after their +// node recovers; deletion clears stale status while force deletion stays away +// from PVC-backed storage. +func TestRecycleStuckControllerPodsHandlesStalePodsOnReadyNodes(t *testing.T) { + old := time.Now().Add(-10 * time.Minute).UTC().Format(time.RFC3339) + recent := time.Now().Add(-30 * time.Second).UTC().Format(time.RFC3339) + pods := `{"items":[` + + `{"metadata":{"namespace":"longhorn-system","name":"longhorn-vault-sync-old","creationTimestamp":"` + old + `","ownerReferences":[{"kind":"ReplicaSet","name":"longhorn-vault-sync"}]},"spec":{"nodeName":"titan-12"},"status":{"phase":"Unknown"}},` + + `{"metadata":{"namespace":"longhorn-system","name":"longhorn-vault-sync-failed","creationTimestamp":"` + old + `","deletionTimestamp":"` + old + `","ownerReferences":[{"kind":"ReplicaSet","name":"longhorn-vault-sync"}]},"spec":{"nodeName":"titan-12","volumes":[{"name":"secret"}]},"status":{"phase":"Failed"}},` + + `{"metadata":{"namespace":"logging","name":"oauth2-proxy-terminating","creationTimestamp":"` + old + `","deletionTimestamp":"` + old + `","ownerReferences":[{"kind":"ReplicaSet","name":"oauth2-proxy-logs"}]},"spec":{"nodeName":"titan-18","volumes":[{"name":"secret"}]},"status":{"phase":"Running"}},` + + `{"metadata":{"namespace":"longhorn-system","name":"pvc-backed-failed","creationTimestamp":"` + old + `","deletionTimestamp":"` + old + `","ownerReferences":[{"kind":"ReplicaSet","name":"pvc-backed"}]},"spec":{"nodeName":"titan-12","volumes":[{"name":"data","persistentVolumeClaim":{"claimName":"data"}}]},"status":{"phase":"Failed"}},` + + `{"metadata":{"namespace":"longhorn-system","name":"longhorn-vault-sync-fresh","creationTimestamp":"` + recent + `","ownerReferences":[{"kind":"ReplicaSet","name":"longhorn-vault-sync"}]},"spec":{"nodeName":"titan-12"},"status":{"phase":"Unknown"}},` + + `{"metadata":{"namespace":"maintenance","name":"stale-on-bad-node","creationTimestamp":"` + old + `","ownerReferences":[{"kind":"ReplicaSet","name":"maintenance"}]},"spec":{"nodeName":"titan-22"},"status":{"phase":"Unknown"}},` + + `{"metadata":{"namespace":"default","name":"bare-pod","creationTimestamp":"` + old + `"},"spec":{"nodeName":"titan-12"},"status":{"phase":"Unknown"}}]}` + + deleted := []string{} + orch := buildOrchestratorWithStubs(t, config.Config{ + Startup: config.Startup{StuckPodGraceSeconds: 180}, + }, []commandStub{ + {match: matchContains("kubectl", "get", "pods", "-A", "-o", "json"), out: pods}, + {match: matchContains("kubectl", "-n", "longhorn-system", "get", "nodes.longhorn.io"), out: "titan-12\tTrue\ntitan-22\tTrue\n"}, + {match: matchContains("kubectl", "get", "events", "-A", "-o", "json"), out: `{"items":[]}`}, + {match: matchContains("kubectl", "get", "nodes", "-o", "json"), out: `{"items":[{"metadata":{"name":"titan-12"},"status":{"conditions":[{"type":"Ready","status":"True"}]}},{"metadata":{"name":"titan-22"},"status":{"conditions":[{"type":"Ready","status":"False"}]}}]}`}, + { + match: func(name string, args []string) bool { + if !matchContains("kubectl", "-n", "longhorn-system", "delete", "pod", "longhorn-vault-sync-old", "--wait=false")(name, args) { + return false + } + deleted = append(deleted, "longhorn-vault-sync-old") + return true + }, + }, + { + match: func(name string, args []string) bool { + if !matchContains("kubectl", "-n", "longhorn-system", "delete", "pod", "longhorn-vault-sync-failed", "--wait=false", "--grace-period=0", "--force")(name, args) { + return false + } + deleted = append(deleted, "longhorn-vault-sync-failed") + return true + }, + }, + { + match: func(name string, args []string) bool { + if !matchContains("kubectl", "-n", "logging", "delete", "pod", "oauth2-proxy-terminating", "--wait=false", "--grace-period=0", "--force")(name, args) { + return false + } + deleted = append(deleted, "oauth2-proxy-terminating") + return true + }, + }, + { + match: func(name string, args []string) bool { + if !matchContains("kubectl", "-n", "longhorn-system", "delete", "pod", "pvc-backed-failed", "--wait=false")(name, args) { + return false + } + if strings.Contains(strings.Join(args, " "), "--force") { + t.Fatalf("pvc-backed stale pod must not be force deleted") + } + deleted = append(deleted, "pvc-backed-failed") + return true + }, + }, + }) + + if err := orch.recycleStuckControllerPods(context.Background()); err != nil { + t.Fatalf("recycleStuckControllerPods failed: %v", err) + } + if strings.Join(deleted, ",") != "longhorn-vault-sync-old,longhorn-vault-sync-failed,oauth2-proxy-terminating,pvc-backed-failed" { + t.Fatalf("expected only stale controller pods on Ready node to be recycled, got %#v", deleted) + } +} + +// TestRecycleStuckControllerPodsCordonsContainerRuntimeWedgeNode runs one orchestration or CLI step. +// Signature: TestRecycleStuckControllerPodsCordonsContainerRuntimeWedgeNode(t *testing.T). +// Why: a Ready node with a wedged container runtime can trap replacement pods +// indefinitely; startup should cordon that scheduler target without draining it +// or touching Longhorn data-plane objects. +func TestRecycleStuckControllerPodsCordonsContainerRuntimeWedgeNode(t *testing.T) { + old := time.Now().Add(-10 * time.Minute).UTC().Format(time.RFC3339) + lastSeen := time.Now().UTC().Format(time.RFC3339) + pods := `{"items":[` + + `{"metadata":{"namespace":"logging","name":"oauth2-proxy-bad","creationTimestamp":"` + old + `","ownerReferences":[{"kind":"ReplicaSet","name":"oauth2-proxy"}]},"spec":{"nodeName":"titan-18","volumes":[{"name":"scratch"}]},"status":{"phase":"Pending","containerStatuses":[{"name":"oauth2-proxy","state":{"waiting":{"reason":"CreateContainerError"}}}]}},` + + `{"metadata":{"namespace":"monitoring","name":"suite-probe-bad","creationTimestamp":"` + old + `","ownerReferences":[{"kind":"Job","name":"suite-probe"}]},"spec":{"nodeName":"titan-18","volumes":[{"name":"scratch"}]},"status":{"phase":"Pending","containerStatuses":[{"name":"probe","state":{"waiting":{"reason":"CreateContainerError"}}}]}},` + + `{"metadata":{"namespace":"sso","name":"secret-ensure-bad","creationTimestamp":"` + old + `","ownerReferences":[{"kind":"Job","name":"secret-ensure"}]},"spec":{"nodeName":"titan-18","volumes":[{"name":"scratch"}]},"status":{"phase":"Pending","initContainerStatuses":[{"name":"init","state":{"waiting":{"reason":"CreateContainerError"}}}]}},` + + `{"metadata":{"namespace":"finance","name":"single-node-bad","creationTimestamp":"` + old + `","ownerReferences":[{"kind":"ReplicaSet","name":"single"}]},"spec":{"nodeName":"titan-19","volumes":[{"name":"scratch"}]},"status":{"phase":"Pending","containerStatuses":[{"name":"app","state":{"waiting":{"reason":"CreateContainerError"}}}]}}]}` + events := `{"items":[` + + `{"metadata":{"namespace":"logging","creationTimestamp":"` + lastSeen + `"},"involvedObject":{"kind":"Pod","namespace":"logging","name":"oauth2-proxy-bad"},"type":"Warning","reason":"Failed","message":"spec.containers{oauth2-proxy}: Error: failed to reserve container name oauth2-proxy_logging","lastTimestamp":"` + lastSeen + `"},` + + `{"metadata":{"namespace":"monitoring","creationTimestamp":"` + lastSeen + `"},"involvedObject":{"kind":"Pod","namespace":"monitoring","name":"suite-probe-bad"},"type":"Warning","reason":"Failed","message":"spec.containers{probe}: Error: context deadline exceeded","lastTimestamp":"` + lastSeen + `"},` + + `{"metadata":{"namespace":"sso","creationTimestamp":"` + lastSeen + `"},"involvedObject":{"kind":"Pod","namespace":"sso","name":"secret-ensure-bad"},"type":"Warning","reason":"Failed","message":"spec.initContainers{init}: Error: failed to reserve container name init_sso","lastTimestamp":"` + lastSeen + `"},` + + `{"metadata":{"namespace":"finance","creationTimestamp":"` + lastSeen + `"},"involvedObject":{"kind":"Pod","namespace":"finance","name":"single-node-bad"},"type":"Warning","reason":"Failed","message":"spec.containers{app}: Error: failed to reserve container name app_finance","lastTimestamp":"` + lastSeen + `"}]}` + + cordoned := []string{} + deleted := []string{} + orch := buildOrchestratorWithStubs(t, config.Config{ + Startup: config.Startup{StuckPodGraceSeconds: 180}, + }, []commandStub{ + {match: matchContains("kubectl", "get", "pods", "-A", "-o", "json"), out: pods}, + {match: matchContains("kubectl", "-n", "longhorn-system", "get", "nodes.longhorn.io"), out: ""}, + {match: matchContains("kubectl", "get", "events", "-A", "-o", "json"), out: events}, + {match: matchContains("kubectl", "get", "nodes", "-o", "json"), out: `{"items":[{"metadata":{"name":"titan-18"},"status":{"conditions":[{"type":"Ready","status":"True"}]}},{"metadata":{"name":"titan-19"},"status":{"conditions":[{"type":"Ready","status":"True"}]}}]}`}, + { + match: func(name string, args []string) bool { + if name != "kubectl" || len(args) == 0 || args[0] != "cordon" { + return false + } + cordoned = append(cordoned, args[len(args)-1]) + return true + }, + }, + { + match: func(name string, args []string) bool { + if !matchContains("kubectl", "delete", "pod", "--wait=false")(name, args) { + return false + } + joined := strings.Join(args, " ") + if strings.Contains(joined, "--force") { + t.Fatalf("container-runtime wedge recycle must not force-delete fresh pods") + } + if len(args) >= 5 { + deleted = append(deleted, args[4]) + } + return true + }, + }, + }) + + if err := orch.recycleStuckControllerPods(context.Background()); err != nil { + t.Fatalf("recycleStuckControllerPods failed: %v", err) + } + if strings.Join(cordoned, ",") != "titan-18" { + t.Fatalf("expected only titan-18 to be cordoned, got %#v", cordoned) + } + if strings.Join(deleted, ",") != "oauth2-proxy-bad,suite-probe-bad,secret-ensure-bad,single-node-bad" { + t.Fatalf("expected runtime-wedged pods to be recycled, got %#v", deleted) + } +} + +// TestEffectiveWorkersFiltersIgnoredUnavailableNodes runs one orchestration or CLI step. +// Signature: TestEffectiveWorkersFiltersIgnoredUnavailableNodes(t *testing.T). +// Why: ignored unavailable nodes should be excluded before startup tries SSH, +// k3s-agent start, or uncordon operations against intentionally absent hosts. +func TestEffectiveWorkersFiltersIgnoredUnavailableNodes(t *testing.T) { + cfg := config.Config{ + Workers: []string{" titan-08 ", "titan-09", "titan-10", "titan-11"}, + Startup: config.Startup{ + IgnoreUnavailableNodes: []string{"titan-09", "titan-10"}, + }, + } + orch := buildOrchestratorWithStubs(t, cfg, nil) + got, err := orch.effectiveWorkers(context.Background()) + if err != nil { + t.Fatalf("effectiveWorkers failed: %v", err) + } + want := []string{"titan-08", "titan-11"} + if strings.Join(got, ",") != strings.Join(want, ",") { + t.Fatalf("effectiveWorkers mismatch got=%v want=%v", got, want) + } +} + +// TestEnsureLonghornEncryptedHostPrereqsFiltersUnsafeWorkers runs one orchestration or CLI step. +// Signature: TestEnsureLonghornEncryptedHostPrereqsFiltersUnsafeWorkers(t *testing.T). +// Why: startup must not uncordon Longhorn workers that cannot mount encrypted +// PVCs; cordoning those nodes is safe and avoids repeating the post-outage +// mount deadlock. +func TestEnsureLonghornEncryptedHostPrereqsFiltersUnsafeWorkers(t *testing.T) { + cordoned := []string{} + orch := buildOrchestratorWithStubs(t, config.Config{ + SSHManagedNodes: []string{"titan-04", "titan-19"}, + }, []commandStub{ + {match: matchContains("kubectl", "get", "nodes", "-l", "longhorn-host=true"), out: "titan-04\ntitan-19\ntitan-23\n"}, + { + match: matchContains("ssh", "titan-04", "command -v cryptsetup"), + out: "__ANANKE_CRYPTSETUP_PRESENT__", + }, + { + match: matchContains("ssh", "titan-19", "apt-get install -y --no-install-recommends cryptsetup-bin"), + err: errors.New("sudo: a password is required"), + }, + { + match: func(name string, args []string) bool { + if name != "kubectl" || len(args) == 0 || args[0] != "cordon" { + return false + } + if len(args) > 1 { + cordoned = append(cordoned, args[len(args)-1]) + } + return true + }, + }, + }) + + got, err := orch.ensureLonghornEncryptedHostPrereqs(context.Background(), []string{"titan-04", "titan-19", "titan-20"}) + if err != nil { + t.Fatalf("ensureLonghornEncryptedHostPrereqs failed: %v", err) + } + want := []string{"titan-04", "titan-20"} + if strings.Join(got, ",") != strings.Join(want, ",") { + t.Fatalf("guarded workers mismatch got=%v want=%v", got, want) + } + if strings.Join(cordoned, ",") != "titan-19,titan-23" { + t.Fatalf("expected unsafe longhorn hosts to be cordoned, got %v", cordoned) + } +} + +// TestLonghornCryptsetupExemptNodesAreNotQuarantined runs one orchestration or CLI step. +// Signature: TestLonghornCryptsetupExemptNodesAreNotQuarantined(t *testing.T). +// Why: Veles/Oceanus uses titan-23 as a Longhorn host for unencrypted local +// volumes; startup should uncordon that policy-exempt node without requiring +// host SSH or weakening encrypted-volume safety on other workers. +func TestLonghornCryptsetupExemptNodesAreNotQuarantined(t *testing.T) { + cordoned := []string{} + uncordoned := []string{} + sshTitan23 := false + orch := buildOrchestratorWithStubs(t, config.Config{ + SSHManagedNodes: []string{"titan-04"}, + Startup: config.Startup{ + LonghornCryptsetupExemptNodes: []string{"titan-23"}, + }, + }, []commandStub{ + {match: matchContains("kubectl", "get", "nodes", "-l", "longhorn-host=true"), out: "titan-04\ntitan-23\n"}, + { + match: matchContains("ssh", "titan-04", "command -v cryptsetup"), + out: "__ANANKE_CRYPTSETUP_PRESENT__", + }, + { + match: func(name string, args []string) bool { + if name == "ssh" && strings.Contains(strings.Join(args, " "), "titan-23") { + sshTitan23 = true + return true + } + return false + }, + }, + { + match: func(name string, args []string) bool { + if name != "kubectl" || len(args) == 0 || args[0] != "cordon" { + return false + } + if len(args) > 1 { + cordoned = append(cordoned, args[len(args)-1]) + } + return true + }, + }, + { + match: func(name string, args []string) bool { + if !matchContains("kubectl", "uncordon")(name, args) { + return false + } + if len(args) > 1 { + uncordoned = append(uncordoned, args[len(args)-1]) + } + return true + }, + }, + }) + + got, err := orch.ensureLonghornEncryptedHostPrereqs(context.Background(), []string{"titan-04"}) + if err != nil { + t.Fatalf("ensureLonghornEncryptedHostPrereqs failed: %v", err) + } + if strings.Join(got, ",") != "titan-04" { + t.Fatalf("guarded workers mismatch got=%v", got) + } + if err := orch.uncordonLonghornCryptsetupExemptNodes(context.Background()); err != nil { + t.Fatalf("uncordonLonghornCryptsetupExemptNodes failed: %v", err) + } + if sshTitan23 { + t.Fatalf("did not expect cryptsetup SSH check for exempt titan-23") + } + if len(cordoned) != 0 { + t.Fatalf("did not expect exempt node to be cordoned, got %v", cordoned) + } + if strings.Join(uncordoned, ",") != "titan-23" { + t.Fatalf("expected exempt titan-23 to be uncordoned, got %v", uncordoned) + } +} + +// TestLonghornHostNodesFallsBackToConfiguredLabels runs one orchestration or CLI step. +// Signature: TestLonghornHostNodesFallsBackToConfiguredLabels(t *testing.T). +// Why: bootstrap caches or minimal test clusters can lack live labels; the +// static startup inventory should still protect configured storage workers. +func TestLonghornHostNodesFallsBackToConfiguredLabels(t *testing.T) { + orch := buildOrchestratorWithStubs(t, config.Config{ + Startup: config.Startup{ + RequiredNodeLabels: map[string]map[string]string{ + "titan-04": {"longhorn-host": "true"}, + "titan-20": {"node-role.kubernetes.io/worker": "true"}, + }, + }, + }, []commandStub{ + {match: matchContains("kubectl", "get", "nodes", "-l", "longhorn-host=true"), out: ""}, + }) + + got, err := orch.longhornHostNodes(context.Background()) + if err != nil { + t.Fatalf("longhornHostNodes failed: %v", err) + } + if _, ok := got["titan-04"]; !ok || len(got) != 1 { + t.Fatalf("expected configured longhorn host fallback, got %v", got) + } +} diff --git a/internal/cluster/orchestrator_workload_status.go b/internal/cluster/orchestrator_workload_status.go new file mode 100644 index 0000000..e665444 --- /dev/null +++ b/internal/cluster/orchestrator_workload_status.go @@ -0,0 +1,49 @@ +package cluster + +import ( + "context" + "fmt" + "strconv" + "strings" + "time" +) + +// workloadReady runs one orchestration or CLI step. +// Signature: (o *Orchestrator) workloadReady(ctx context.Context, w startupWorkload) (bool, error). +// Why: keeps behavior explicit so startup/shutdown workflows remain maintainable as services evolve. +func (o *Orchestrator) workloadReady(ctx context.Context, w startupWorkload) (bool, error) { + out, err := o.kubectl( + ctx, + 20*time.Second, + "-n", + w.Namespace, + "get", + w.Kind, + w.Name, + "-o", + "jsonpath={.status.readyReplicas}", + ) + if err != nil { + return false, err + } + raw := strings.TrimSpace(out) + if raw == "" || raw == "" { + return false, nil + } + n, err := strconv.Atoi(raw) + if err != nil { + return false, fmt.Errorf("parse readyReplicas %q: %w", raw, err) + } + return n >= 1, nil +} + +// isNotFoundErr runs one orchestration or CLI step. +// Signature: isNotFoundErr(err error) bool. +// Why: keeps behavior explicit so startup/shutdown workflows remain maintainable as services evolve. +func isNotFoundErr(err error) bool { + if err == nil { + return false + } + msg := strings.ToLower(err.Error()) + return strings.Contains(msg, "not found") || strings.Contains(msg, "(notfound)") +} diff --git a/internal/config/apply_defaults.go b/internal/config/apply_defaults.go index 408a1e7..267b208 100644 --- a/internal/config/apply_defaults.go +++ b/internal/config/apply_defaults.go @@ -201,6 +201,9 @@ func (c *Config) applyDefaults() { if c.Startup.PostStartAutoHealSeconds <= 0 { c.Startup.PostStartAutoHealSeconds = 60 } + if c.Startup.RecoveryCordonMaxSeconds <= 0 { + c.Startup.RecoveryCordonMaxSeconds = 3600 + } if c.Startup.DeadNodeCleanupGraceSeconds <= 0 { c.Startup.DeadNodeCleanupGraceSeconds = 300 } diff --git a/internal/config/defaults.go b/internal/config/defaults.go index 04fd065..c8547ab 100644 --- a/internal/config/defaults.go +++ b/internal/config/defaults.go @@ -121,6 +121,7 @@ func defaults() Config { IgnoreUnavailableNodes: []string{}, AutoRecycleStuckPods: true, StuckPodGraceSeconds: 180, + RecoveryCordonMaxSeconds: 3600, VaultUnsealKeyFile: "/var/lib/ananke/vault-unseal.key", VaultUnsealBreakglassTimeout: 15, }, diff --git a/internal/config/types.go b/internal/config/types.go index 7ebe94f..b5478f3 100644 --- a/internal/config/types.go +++ b/internal/config/types.go @@ -94,6 +94,7 @@ type Startup struct { SchedulingStormWindowSeconds int `yaml:"scheduling_storm_window_seconds"` StuckPodGraceSeconds int `yaml:"stuck_pod_grace_seconds"` PostStartAutoHealSeconds int `yaml:"post_start_auto_heal_seconds"` + RecoveryCordonMaxSeconds int `yaml:"recovery_cordon_max_seconds"` DeadNodeCleanupGraceSeconds int `yaml:"dead_node_cleanup_grace_seconds"` VaultUnsealKeyFile string `yaml:"vault_unseal_key_file"` VaultUnsealBreakglassCommand string `yaml:"vault_unseal_breakglass_command"` diff --git a/internal/config/validate.go b/internal/config/validate.go index 7ed707d..7f6b819 100644 --- a/internal/config/validate.go +++ b/internal/config/validate.go @@ -280,6 +280,9 @@ func (c Config) Validate() error { if c.Startup.PostStartAutoHealSeconds <= 0 { return fmt.Errorf("config.startup.post_start_auto_heal_seconds must be > 0") } + if c.Startup.RecoveryCordonMaxSeconds <= 0 { + return fmt.Errorf("config.startup.recovery_cordon_max_seconds must be > 0") + } if c.Startup.DeadNodeCleanupGraceSeconds <= 0 { return fmt.Errorf("config.startup.dead_node_cleanup_grace_seconds must be > 0") } diff --git a/testing/hygiene/in_tree_test_allowlist.txt b/testing/hygiene/in_tree_test_allowlist.txt index 923cfd5..1c98892 100644 --- a/testing/hygiene/in_tree_test_allowlist.txt +++ b/testing/hygiene/in_tree_test_allowlist.txt @@ -16,8 +16,11 @@ internal/cluster/orchestrator_report_test.go internal/cluster/orchestrator_autorepair_test.go internal/cluster/orchestrator_autorepair_cleanup_test.go internal/cluster/orchestrator_autorepair_proxy_test.go +internal/cluster/orchestrator_critical_endpoint_additional_test.go +internal/cluster/orchestrator_cordon_lease_test.go internal/cluster/orchestrator_test.go internal/cluster/orchestrator_unit_additional_test.go +internal/cluster/orchestrator_workload_recovery_test.go internal/cluster/orchestrator_vault_test.go internal/config/config_test.go internal/config/load_additional_test.go