diff --git a/configs/ananke.tethys.yaml b/configs/ananke.tethys.yaml index c0d8005..f245853 100644 --- a/configs/ananke.tethys.yaml +++ b/configs/ananke.tethys.yaml @@ -280,7 +280,8 @@ startup: - harbor ignore_workload_namespaces: [] ignore_workloads: [] - ignore_unavailable_nodes: [] + ignore_unavailable_nodes: + - titan-09 auto_recycle_stuck_pods: true auto_quarantine_scheduling_storms: true scheduling_storm_event_threshold: 30 diff --git a/configs/ananke.titan-db.yaml b/configs/ananke.titan-db.yaml index 4bcb39e..84545dc 100644 --- a/configs/ananke.titan-db.yaml +++ b/configs/ananke.titan-db.yaml @@ -280,7 +280,8 @@ startup: - harbor ignore_workload_namespaces: [] ignore_workloads: [] - ignore_unavailable_nodes: [] + ignore_unavailable_nodes: + - titan-09 auto_recycle_stuck_pods: true auto_quarantine_scheduling_storms: true scheduling_storm_event_threshold: 30 diff --git a/internal/cluster/orchestrator_critical_vault.go b/internal/cluster/orchestrator_critical_vault.go index c93136e..c4f5c66 100644 --- a/internal/cluster/orchestrator_critical_vault.go +++ b/internal/cluster/orchestrator_critical_vault.go @@ -162,24 +162,35 @@ func (o *Orchestrator) waitWorkloadReady(ctx context.Context, w startupWorkload) return o.waitVaultReady(ctx, w) } - timeout := "240s" - if w.Kind == "statefulset" { - timeout = "360s" + deadline := time.Now().Add(7 * time.Minute) + var lastErr error + for time.Now().Before(deadline) { + o.bestEffort("recycle stuck critical workload pods", func() error { return o.recycleStuckControllerPods(ctx) }) + _, err := o.kubectl( + ctx, + 45*time.Second, + "-n", + w.Namespace, + "rollout", + "status", + fmt.Sprintf("%s/%s", w.Kind, w.Name), + "--timeout=30s", + ) + if err == nil { + return nil + } + lastErr = err + + select { + case <-ctx.Done(): + return ctx.Err() + case <-time.After(5 * time.Second): + } } - _, err := o.kubectl( - ctx, - 7*time.Minute, - "-n", - w.Namespace, - "rollout", - "status", - fmt.Sprintf("%s/%s", w.Kind, w.Name), - "--timeout="+timeout, - ) - if err != nil { - return fmt.Errorf("wait ready %s/%s/%s: %w", w.Namespace, w.Kind, w.Name, err) + if lastErr != nil { + return fmt.Errorf("wait ready %s/%s/%s: %w", w.Namespace, w.Kind, w.Name, lastErr) } - return nil + return fmt.Errorf("wait ready %s/%s/%s: timeout", w.Namespace, w.Kind, w.Name) } // waitVaultReady runs one orchestration or CLI step. diff --git a/internal/cluster/orchestrator_lifecycle.go b/internal/cluster/orchestrator_lifecycle.go index 5e849be..e698c8d 100644 --- a/internal/cluster/orchestrator_lifecycle.go +++ b/internal/cluster/orchestrator_lifecycle.go @@ -45,12 +45,17 @@ func (o *Orchestrator) Startup(ctx context.Context, opts StartupOptions) (err er } o.noteStartupCheck("node-inventory-reachability", true, "all expected nodes responded over SSH") resumedFlux := false + allowFailedStartupFluxResume := false defer func() { if o.runner.DryRun || err == nil || resumedFlux { return } - o.log.Printf("warning: startup failed before normal flux resume; attempting best-effort recovery resume") o.bestEffort("restore scaled workloads after failed startup", func() error { return o.restoreScaledApps(ctx) }) + if !allowFailedStartupFluxResume { + o.log.Printf("warning: startup failed before recovery-safe flux resume; leaving flux suspension state unchanged") + return + } + o.log.Printf("warning: startup failed during flux resume; attempting best-effort recovery resume") o.bestEffort("resume flux after failed startup", func() error { return o.resumeFluxAndReconcile(ctx) }) }() @@ -292,6 +297,7 @@ func (o *Orchestrator) Startup(ctx context.Context, opts StartupOptions) (err er o.noteStartupCheckState("flux-resume-reconcile", "running", "resuming flux and waiting for reconcile") o.setStartupPhase("flux-resume", "resuming flux controllers and reconciling kustomizations") + allowFailedStartupFluxResume = true if err := o.resumeFluxAndReconcile(ctx); err != nil { o.noteStartupCheck("flux-resume-reconcile", false, err.Error()) return err diff --git a/internal/cluster/orchestrator_unit_additional_test.go b/internal/cluster/orchestrator_unit_additional_test.go index 1d63675..996df2d 100644 --- a/internal/cluster/orchestrator_unit_additional_test.go +++ b/internal/cluster/orchestrator_unit_additional_test.go @@ -8,6 +8,7 @@ import ( "net" "path/filepath" "strings" + "sync" "testing" "time" @@ -73,6 +74,109 @@ func matchContains(cmd string, parts ...string) func(string, []string) bool { } } +// TestStartupEarlyFailureLeavesFluxSuspensionUnchanged runs one orchestration or CLI step. +// Signature: TestStartupEarlyFailureLeavesFluxSuspensionUnchanged(t *testing.T). +// Why: recovery must not release Flux when bootstrap fails before storage and +// critical workloads are ready, or Flux can re-create the same dependency loop. +func TestStartupEarlyFailureLeavesFluxSuspensionUnchanged(t *testing.T) { + tmpDir := t.TempDir() + cfg := config.Config{ + SSHPort: 2277, + Startup: config.Startup{ + APIWaitSeconds: 1, + APIPollSeconds: 1, + RequireNodeInventoryReach: false, + RequireTimeSync: false, + RequireNodeSSHAuth: false, + ReconcileAccessOnBoot: false, + AutoEtcdRestoreOnAPIFailure: false, + RequiredNodeLabels: map[string]map[string]string{ + "titan-missing": { + "node-role.kubernetes.io/worker": "true", + }, + }, + }, + State: config.State{ + Dir: tmpDir, + ReportsDir: filepath.Join(tmpDir, "reports"), + RunHistoryPath: filepath.Join(tmpDir, "runs.json"), + LockPath: filepath.Join(tmpDir, "ananke.lock"), + IntentPath: filepath.Join(tmpDir, "intent.json"), + }, + } + + var mu sync.Mutex + calls := []string{} + orch := buildOrchestratorWithStubs(t, cfg, []commandStub{ + { + match: func(name string, args []string) bool { + mu.Lock() + calls = append(calls, name+" "+strings.Join(args, " ")) + mu.Unlock() + return false + }, + }, + {match: matchContains("kubectl", "version", "--request-timeout=5s"), out: "ok"}, + {match: matchContains("kubectl", "-n", "vault", "get", "pod", "vault-0"), out: "Pending"}, + { + match: matchContains("kubectl", "label", "node", "titan-missing"), + err: errors.New(`nodes "titan-missing" not found`), + }, + }) + + err := orch.Startup(context.Background(), StartupOptions{Reason: "test early failure"}) + if err == nil { + t.Fatalf("expected startup to fail before flux resume") + } + if !strings.Contains(err.Error(), "ensure required node labels on titan-missing") { + t.Fatalf("expected required-label failure, got: %v", err) + } + + mu.Lock() + defer mu.Unlock() + for _, call := range calls { + if strings.Contains(call, `"suspend":false`) { + t.Fatalf("early failed startup unexpectedly resumed flux via call: %s", call) + } + } +} + +// TestRecycleStuckControllerPodsHandlesLonghornAttachBlockedPods runs one orchestration or CLI step. +// Signature: TestRecycleStuckControllerPodsHandlesLonghornAttachBlockedPods(t *testing.T). +// Why: Pending Longhorn-backed pods on Longhorn-unready nodes should be +// rescheduled without mutating Longhorn volume, replica, or disk objects. +func TestRecycleStuckControllerPodsHandlesLonghornAttachBlockedPods(t *testing.T) { + created := time.Now().Add(-10 * time.Minute).UTC().Format(time.RFC3339) + lastSeen := time.Now().UTC().Format(time.RFC3339) + pods := `{"items":[{"metadata":{"namespace":"monitoring","name":"victoria-metrics-single-server-0","creationTimestamp":"` + created + `","ownerReferences":[{"kind":"StatefulSet","name":"victoria-metrics-single-server"}]},"spec":{"nodeName":"titan-0b"},"status":{"phase":"Pending"}}]}` + events := `{"items":[{"metadata":{"namespace":"monitoring","creationTimestamp":"` + lastSeen + `"},"involvedObject":{"kind":"Pod","namespace":"monitoring","name":"victoria-metrics-single-server-0"},"type":"Warning","reason":"FailedAttachVolume","message":"AttachVolume.Attach failed for volume \"pvc-1\" : rpc error from [http://longhorn-backend:9500/v1/volumes/pvc-1?action=attach]: unable to attach volume pvc-1 to titan-0b: node titan-0b is not ready","lastTimestamp":"` + lastSeen + `"}]}` + + deleted := false + orch := buildOrchestratorWithStubs(t, config.Config{ + Startup: config.Startup{StuckPodGraceSeconds: 180}, + }, []commandStub{ + {match: matchContains("kubectl", "get", "pods", "-A", "-o", "json"), out: pods}, + {match: matchContains("kubectl", "-n", "longhorn-system", "get", "nodes.longhorn.io"), out: "titan-0b\tFalse\n"}, + {match: matchContains("kubectl", "get", "events", "-A", "-o", "json"), out: events}, + { + match: func(name string, args []string) bool { + if !matchContains("kubectl", "-n", "monitoring", "delete", "pod", "victoria-metrics-single-server-0", "--wait=false")(name, args) { + return false + } + deleted = true + return true + }, + }, + }) + + if err := orch.recycleStuckControllerPods(context.Background()); err != nil { + t.Fatalf("recycleStuckControllerPods failed: %v", err) + } + if !deleted { + t.Fatalf("expected longhorn attach-blocked pending pod to be recycled") + } +} + // TestNewConstructsOrchestrator runs one orchestration or CLI step. // Signature: TestNewConstructsOrchestrator(t *testing.T). // Why: covers constructor path in orchestrator core module. diff --git a/internal/cluster/orchestrator_workload_convergence.go b/internal/cluster/orchestrator_workload_convergence.go index 31f648c..7add98a 100644 --- a/internal/cluster/orchestrator_workload_convergence.go +++ b/internal/cluster/orchestrator_workload_convergence.go @@ -170,6 +170,12 @@ func (o *Orchestrator) recycleStuckControllerPods(ctx context.Context) error { "CreateContainerConfigError": {}, "CreateContainerError": {}, } + longhornAttachReasons := map[string]string{} + if reasons, scanErr := o.longhornAttachBlockedPodReasons(ctx, list, grace); scanErr != nil { + o.log.Printf("warning: longhorn attach-blocked pod scan failed: %v", scanErr) + } else { + longhornAttachReasons = reasons + } recycled := []string{} for _, pod := range list.Items { ns := strings.TrimSpace(pod.Metadata.Namespace) @@ -197,6 +203,9 @@ func (o *Orchestrator) recycleStuckControllerPods(ctx context.Context) error { if reason == "" { reason = stuckVaultInitReason(pod, grace) } + if reason == "" { + reason = longhornAttachReasons[ns+"/"+name] + } if reason == "" { continue } @@ -215,6 +224,118 @@ func (o *Orchestrator) recycleStuckControllerPods(ctx context.Context) error { return nil } +// longhornAttachBlockedPodReasons runs one orchestration or CLI step. +// Signature: (o *Orchestrator) longhornAttachBlockedPodReasons(ctx context.Context, pods podList, grace time.Duration) (map[string]string, error). +// Why: after a power event, Kubernetes can schedule a Longhorn-backed pod onto a +// node Longhorn still marks unready. Recycling the unattached Pending pod lets +// the scheduler pick a Longhorn-ready node without touching Longhorn data-plane +// objects. +func (o *Orchestrator) longhornAttachBlockedPodReasons(ctx context.Context, pods podList, grace time.Duration) (map[string]string, error) { + unreadyNodes, err := o.longhornUnreadyNodes(ctx) + if err != nil { + return nil, err + } + if len(unreadyNodes) == 0 { + return map[string]string{}, nil + } + + eventsOut, err := o.kubectl(ctx, 30*time.Second, "get", "events", "-A", "-o", "json") + if err != nil { + return nil, fmt.Errorf("query events for longhorn attach-blocked pod scan: %w", err) + } + var events eventList + if err := json.Unmarshal([]byte(eventsOut), &events); err != nil { + return nil, fmt.Errorf("decode events for longhorn attach-blocked pod scan: %w", err) + } + + podsByKey := map[string]podResource{} + for _, pod := range pods.Items { + ns := strings.TrimSpace(pod.Metadata.Namespace) + name := strings.TrimSpace(pod.Metadata.Name) + node := strings.TrimSpace(pod.Spec.NodeName) + if ns == "" || name == "" || node == "" { + continue + } + if !strings.EqualFold(strings.TrimSpace(pod.Status.Phase), "Pending") { + continue + } + if _, unready := unreadyNodes[node]; !unready { + continue + } + if !podControllerOwned(pod) { + continue + } + if !pod.Metadata.CreationTimestamp.IsZero() && time.Since(pod.Metadata.CreationTimestamp) < grace { + continue + } + podsByKey[ns+"/"+name] = pod + } + if len(podsByKey) == 0 { + return map[string]string{}, nil + } + + reasons := map[string]string{} + for _, event := range events.Items { + if !strings.EqualFold(strings.TrimSpace(event.Type), "Warning") { + continue + } + if strings.TrimSpace(event.Reason) != "FailedAttachVolume" { + continue + } + if !strings.EqualFold(strings.TrimSpace(event.InvolvedObject.Kind), "Pod") { + continue + } + key := strings.TrimSpace(event.InvolvedObject.Namespace) + "/" + strings.TrimSpace(event.InvolvedObject.Name) + pod, ok := podsByKey[key] + if !ok { + continue + } + lastSeen := eventLastObservedAt(event) + if !lastSeen.IsZero() && !pod.Metadata.CreationTimestamp.IsZero() && lastSeen.Before(pod.Metadata.CreationTimestamp) { + continue + } + node := strings.TrimSpace(pod.Spec.NodeName) + message := strings.ToLower(strings.TrimSpace(event.Message)) + if !strings.Contains(message, "longhorn-backend") || !strings.Contains(message, "failed for volume") { + continue + } + if !strings.Contains(message, "node "+strings.ToLower(node)+" is not ready") { + continue + } + reasons[key] = "LonghornAttachBlockedOnUnreadyNode:" + node + } + return reasons, nil +} + +// longhornUnreadyNodes runs one orchestration or CLI step. +// Signature: (o *Orchestrator) longhornUnreadyNodes(ctx context.Context) (map[string]struct{}, error). +// Why: Longhorn node readiness can lag or intentionally differ from Kubernetes +// node readiness; attach recovery must use Longhorn's view for safety. +func (o *Orchestrator) longhornUnreadyNodes(ctx context.Context) (map[string]struct{}, error) { + out, err := o.kubectl(ctx, 30*time.Second, + "-n", "longhorn-system", + "get", "nodes.longhorn.io", + "-o", "jsonpath={range .items[*]}{.metadata.name}{'\\t'}{range .status.conditions[?(@.type==\"Ready\")]}{.status}{end}{'\\n'}{end}", + ) + if err != nil { + if isNotFoundErr(err) { + return map[string]struct{}{}, nil + } + return nil, fmt.Errorf("query longhorn node readiness: %w", err) + } + unready := map[string]struct{}{} + for _, line := range lines(out) { + fields := strings.Fields(line) + if len(fields) < 2 { + continue + } + if !strings.EqualFold(strings.TrimSpace(fields[1]), "True") { + unready[strings.TrimSpace(fields[0])] = struct{}{} + } + } + return unready, nil +} + // podControllerOwned runs one orchestration or CLI step. // Signature: podControllerOwned(p podResource) bool. // Why: keeps behavior explicit so startup/shutdown workflows remain maintainable as services evolve.