recovery: keep flux held before safe resume

This commit is contained in:
codex 2026-06-18 22:08:14 -03:00
parent 61dc0a9ef4
commit 904f6b1a62
6 changed files with 263 additions and 19 deletions

View File

@ -280,7 +280,8 @@ startup:
- harbor
ignore_workload_namespaces: []
ignore_workloads: []
ignore_unavailable_nodes: []
ignore_unavailable_nodes:
- titan-09
auto_recycle_stuck_pods: true
auto_quarantine_scheduling_storms: true
scheduling_storm_event_threshold: 30

View File

@ -280,7 +280,8 @@ startup:
- harbor
ignore_workload_namespaces: []
ignore_workloads: []
ignore_unavailable_nodes: []
ignore_unavailable_nodes:
- titan-09
auto_recycle_stuck_pods: true
auto_quarantine_scheduling_storms: true
scheduling_storm_event_threshold: 30

View File

@ -162,24 +162,35 @@ func (o *Orchestrator) waitWorkloadReady(ctx context.Context, w startupWorkload)
return o.waitVaultReady(ctx, w)
}
timeout := "240s"
if w.Kind == "statefulset" {
timeout = "360s"
deadline := time.Now().Add(7 * time.Minute)
var lastErr error
for time.Now().Before(deadline) {
o.bestEffort("recycle stuck critical workload pods", func() error { return o.recycleStuckControllerPods(ctx) })
_, err := o.kubectl(
ctx,
45*time.Second,
"-n",
w.Namespace,
"rollout",
"status",
fmt.Sprintf("%s/%s", w.Kind, w.Name),
"--timeout=30s",
)
if err == nil {
return nil
}
lastErr = err
select {
case <-ctx.Done():
return ctx.Err()
case <-time.After(5 * time.Second):
}
}
_, err := o.kubectl(
ctx,
7*time.Minute,
"-n",
w.Namespace,
"rollout",
"status",
fmt.Sprintf("%s/%s", w.Kind, w.Name),
"--timeout="+timeout,
)
if err != nil {
return fmt.Errorf("wait ready %s/%s/%s: %w", w.Namespace, w.Kind, w.Name, err)
if lastErr != nil {
return fmt.Errorf("wait ready %s/%s/%s: %w", w.Namespace, w.Kind, w.Name, lastErr)
}
return nil
return fmt.Errorf("wait ready %s/%s/%s: timeout", w.Namespace, w.Kind, w.Name)
}
// waitVaultReady runs one orchestration or CLI step.

View File

@ -45,12 +45,17 @@ func (o *Orchestrator) Startup(ctx context.Context, opts StartupOptions) (err er
}
o.noteStartupCheck("node-inventory-reachability", true, "all expected nodes responded over SSH")
resumedFlux := false
allowFailedStartupFluxResume := false
defer func() {
if o.runner.DryRun || err == nil || resumedFlux {
return
}
o.log.Printf("warning: startup failed before normal flux resume; attempting best-effort recovery resume")
o.bestEffort("restore scaled workloads after failed startup", func() error { return o.restoreScaledApps(ctx) })
if !allowFailedStartupFluxResume {
o.log.Printf("warning: startup failed before recovery-safe flux resume; leaving flux suspension state unchanged")
return
}
o.log.Printf("warning: startup failed during flux resume; attempting best-effort recovery resume")
o.bestEffort("resume flux after failed startup", func() error { return o.resumeFluxAndReconcile(ctx) })
}()
@ -292,6 +297,7 @@ func (o *Orchestrator) Startup(ctx context.Context, opts StartupOptions) (err er
o.noteStartupCheckState("flux-resume-reconcile", "running", "resuming flux and waiting for reconcile")
o.setStartupPhase("flux-resume", "resuming flux controllers and reconciling kustomizations")
allowFailedStartupFluxResume = true
if err := o.resumeFluxAndReconcile(ctx); err != nil {
o.noteStartupCheck("flux-resume-reconcile", false, err.Error())
return err

View File

@ -8,6 +8,7 @@ import (
"net"
"path/filepath"
"strings"
"sync"
"testing"
"time"
@ -73,6 +74,109 @@ func matchContains(cmd string, parts ...string) func(string, []string) bool {
}
}
// TestStartupEarlyFailureLeavesFluxSuspensionUnchanged runs one orchestration or CLI step.
// Signature: TestStartupEarlyFailureLeavesFluxSuspensionUnchanged(t *testing.T).
// Why: recovery must not release Flux when bootstrap fails before storage and
// critical workloads are ready, or Flux can re-create the same dependency loop.
func TestStartupEarlyFailureLeavesFluxSuspensionUnchanged(t *testing.T) {
tmpDir := t.TempDir()
cfg := config.Config{
SSHPort: 2277,
Startup: config.Startup{
APIWaitSeconds: 1,
APIPollSeconds: 1,
RequireNodeInventoryReach: false,
RequireTimeSync: false,
RequireNodeSSHAuth: false,
ReconcileAccessOnBoot: false,
AutoEtcdRestoreOnAPIFailure: false,
RequiredNodeLabels: map[string]map[string]string{
"titan-missing": {
"node-role.kubernetes.io/worker": "true",
},
},
},
State: config.State{
Dir: tmpDir,
ReportsDir: filepath.Join(tmpDir, "reports"),
RunHistoryPath: filepath.Join(tmpDir, "runs.json"),
LockPath: filepath.Join(tmpDir, "ananke.lock"),
IntentPath: filepath.Join(tmpDir, "intent.json"),
},
}
var mu sync.Mutex
calls := []string{}
orch := buildOrchestratorWithStubs(t, cfg, []commandStub{
{
match: func(name string, args []string) bool {
mu.Lock()
calls = append(calls, name+" "+strings.Join(args, " "))
mu.Unlock()
return false
},
},
{match: matchContains("kubectl", "version", "--request-timeout=5s"), out: "ok"},
{match: matchContains("kubectl", "-n", "vault", "get", "pod", "vault-0"), out: "Pending"},
{
match: matchContains("kubectl", "label", "node", "titan-missing"),
err: errors.New(`nodes "titan-missing" not found`),
},
})
err := orch.Startup(context.Background(), StartupOptions{Reason: "test early failure"})
if err == nil {
t.Fatalf("expected startup to fail before flux resume")
}
if !strings.Contains(err.Error(), "ensure required node labels on titan-missing") {
t.Fatalf("expected required-label failure, got: %v", err)
}
mu.Lock()
defer mu.Unlock()
for _, call := range calls {
if strings.Contains(call, `"suspend":false`) {
t.Fatalf("early failed startup unexpectedly resumed flux via call: %s", call)
}
}
}
// TestRecycleStuckControllerPodsHandlesLonghornAttachBlockedPods runs one orchestration or CLI step.
// Signature: TestRecycleStuckControllerPodsHandlesLonghornAttachBlockedPods(t *testing.T).
// Why: Pending Longhorn-backed pods on Longhorn-unready nodes should be
// rescheduled without mutating Longhorn volume, replica, or disk objects.
func TestRecycleStuckControllerPodsHandlesLonghornAttachBlockedPods(t *testing.T) {
created := time.Now().Add(-10 * time.Minute).UTC().Format(time.RFC3339)
lastSeen := time.Now().UTC().Format(time.RFC3339)
pods := `{"items":[{"metadata":{"namespace":"monitoring","name":"victoria-metrics-single-server-0","creationTimestamp":"` + created + `","ownerReferences":[{"kind":"StatefulSet","name":"victoria-metrics-single-server"}]},"spec":{"nodeName":"titan-0b"},"status":{"phase":"Pending"}}]}`
events := `{"items":[{"metadata":{"namespace":"monitoring","creationTimestamp":"` + lastSeen + `"},"involvedObject":{"kind":"Pod","namespace":"monitoring","name":"victoria-metrics-single-server-0"},"type":"Warning","reason":"FailedAttachVolume","message":"AttachVolume.Attach failed for volume \"pvc-1\" : rpc error from [http://longhorn-backend:9500/v1/volumes/pvc-1?action=attach]: unable to attach volume pvc-1 to titan-0b: node titan-0b is not ready","lastTimestamp":"` + lastSeen + `"}]}`
deleted := false
orch := buildOrchestratorWithStubs(t, config.Config{
Startup: config.Startup{StuckPodGraceSeconds: 180},
}, []commandStub{
{match: matchContains("kubectl", "get", "pods", "-A", "-o", "json"), out: pods},
{match: matchContains("kubectl", "-n", "longhorn-system", "get", "nodes.longhorn.io"), out: "titan-0b\tFalse\n"},
{match: matchContains("kubectl", "get", "events", "-A", "-o", "json"), out: events},
{
match: func(name string, args []string) bool {
if !matchContains("kubectl", "-n", "monitoring", "delete", "pod", "victoria-metrics-single-server-0", "--wait=false")(name, args) {
return false
}
deleted = true
return true
},
},
})
if err := orch.recycleStuckControllerPods(context.Background()); err != nil {
t.Fatalf("recycleStuckControllerPods failed: %v", err)
}
if !deleted {
t.Fatalf("expected longhorn attach-blocked pending pod to be recycled")
}
}
// TestNewConstructsOrchestrator runs one orchestration or CLI step.
// Signature: TestNewConstructsOrchestrator(t *testing.T).
// Why: covers constructor path in orchestrator core module.

View File

@ -170,6 +170,12 @@ func (o *Orchestrator) recycleStuckControllerPods(ctx context.Context) error {
"CreateContainerConfigError": {},
"CreateContainerError": {},
}
longhornAttachReasons := map[string]string{}
if reasons, scanErr := o.longhornAttachBlockedPodReasons(ctx, list, grace); scanErr != nil {
o.log.Printf("warning: longhorn attach-blocked pod scan failed: %v", scanErr)
} else {
longhornAttachReasons = reasons
}
recycled := []string{}
for _, pod := range list.Items {
ns := strings.TrimSpace(pod.Metadata.Namespace)
@ -197,6 +203,9 @@ func (o *Orchestrator) recycleStuckControllerPods(ctx context.Context) error {
if reason == "" {
reason = stuckVaultInitReason(pod, grace)
}
if reason == "" {
reason = longhornAttachReasons[ns+"/"+name]
}
if reason == "" {
continue
}
@ -215,6 +224,118 @@ func (o *Orchestrator) recycleStuckControllerPods(ctx context.Context) error {
return nil
}
// longhornAttachBlockedPodReasons runs one orchestration or CLI step.
// Signature: (o *Orchestrator) longhornAttachBlockedPodReasons(ctx context.Context, pods podList, grace time.Duration) (map[string]string, error).
// Why: after a power event, Kubernetes can schedule a Longhorn-backed pod onto a
// node Longhorn still marks unready. Recycling the unattached Pending pod lets
// the scheduler pick a Longhorn-ready node without touching Longhorn data-plane
// objects.
func (o *Orchestrator) longhornAttachBlockedPodReasons(ctx context.Context, pods podList, grace time.Duration) (map[string]string, error) {
unreadyNodes, err := o.longhornUnreadyNodes(ctx)
if err != nil {
return nil, err
}
if len(unreadyNodes) == 0 {
return map[string]string{}, nil
}
eventsOut, err := o.kubectl(ctx, 30*time.Second, "get", "events", "-A", "-o", "json")
if err != nil {
return nil, fmt.Errorf("query events for longhorn attach-blocked pod scan: %w", err)
}
var events eventList
if err := json.Unmarshal([]byte(eventsOut), &events); err != nil {
return nil, fmt.Errorf("decode events for longhorn attach-blocked pod scan: %w", err)
}
podsByKey := map[string]podResource{}
for _, pod := range pods.Items {
ns := strings.TrimSpace(pod.Metadata.Namespace)
name := strings.TrimSpace(pod.Metadata.Name)
node := strings.TrimSpace(pod.Spec.NodeName)
if ns == "" || name == "" || node == "" {
continue
}
if !strings.EqualFold(strings.TrimSpace(pod.Status.Phase), "Pending") {
continue
}
if _, unready := unreadyNodes[node]; !unready {
continue
}
if !podControllerOwned(pod) {
continue
}
if !pod.Metadata.CreationTimestamp.IsZero() && time.Since(pod.Metadata.CreationTimestamp) < grace {
continue
}
podsByKey[ns+"/"+name] = pod
}
if len(podsByKey) == 0 {
return map[string]string{}, nil
}
reasons := map[string]string{}
for _, event := range events.Items {
if !strings.EqualFold(strings.TrimSpace(event.Type), "Warning") {
continue
}
if strings.TrimSpace(event.Reason) != "FailedAttachVolume" {
continue
}
if !strings.EqualFold(strings.TrimSpace(event.InvolvedObject.Kind), "Pod") {
continue
}
key := strings.TrimSpace(event.InvolvedObject.Namespace) + "/" + strings.TrimSpace(event.InvolvedObject.Name)
pod, ok := podsByKey[key]
if !ok {
continue
}
lastSeen := eventLastObservedAt(event)
if !lastSeen.IsZero() && !pod.Metadata.CreationTimestamp.IsZero() && lastSeen.Before(pod.Metadata.CreationTimestamp) {
continue
}
node := strings.TrimSpace(pod.Spec.NodeName)
message := strings.ToLower(strings.TrimSpace(event.Message))
if !strings.Contains(message, "longhorn-backend") || !strings.Contains(message, "failed for volume") {
continue
}
if !strings.Contains(message, "node "+strings.ToLower(node)+" is not ready") {
continue
}
reasons[key] = "LonghornAttachBlockedOnUnreadyNode:" + node
}
return reasons, nil
}
// longhornUnreadyNodes runs one orchestration or CLI step.
// Signature: (o *Orchestrator) longhornUnreadyNodes(ctx context.Context) (map[string]struct{}, error).
// Why: Longhorn node readiness can lag or intentionally differ from Kubernetes
// node readiness; attach recovery must use Longhorn's view for safety.
func (o *Orchestrator) longhornUnreadyNodes(ctx context.Context) (map[string]struct{}, error) {
out, err := o.kubectl(ctx, 30*time.Second,
"-n", "longhorn-system",
"get", "nodes.longhorn.io",
"-o", "jsonpath={range .items[*]}{.metadata.name}{'\\t'}{range .status.conditions[?(@.type==\"Ready\")]}{.status}{end}{'\\n'}{end}",
)
if err != nil {
if isNotFoundErr(err) {
return map[string]struct{}{}, nil
}
return nil, fmt.Errorf("query longhorn node readiness: %w", err)
}
unready := map[string]struct{}{}
for _, line := range lines(out) {
fields := strings.Fields(line)
if len(fields) < 2 {
continue
}
if !strings.EqualFold(strings.TrimSpace(fields[1]), "True") {
unready[strings.TrimSpace(fields[0])] = struct{}{}
}
}
return unready, nil
}
// podControllerOwned runs one orchestration or CLI step.
// Signature: podControllerOwned(p podResource) bool.
// Why: keeps behavior explicit so startup/shutdown workflows remain maintainable as services evolve.