package cluster import ( "context" "fmt" "sort" "strings" "time" ) // waitForCriticalServiceEndpoints runs one orchestration or CLI step. // Signature: (o *Orchestrator) waitForCriticalServiceEndpoints(ctx context.Context) error. // Why: some externally-healthy services (like Grafana) still require backend // in-cluster dependencies; endpoint checks catch this drift before startup passes. func (o *Orchestrator) waitForCriticalServiceEndpoints(ctx context.Context) error { wait := time.Duration(o.cfg.Startup.CriticalServiceEndpointWaitSec) * time.Second if wait <= 0 { wait = 7 * time.Minute } poll := time.Duration(o.cfg.Startup.CriticalServiceEndpointPollSec) * time.Second if poll <= 0 { poll = 5 * time.Second } deadline := time.Now().Add(wait) lastFailure := "unknown" lastLogged := time.Time{} lastHealAttempt := time.Time{} for { ready, detail, failedNamespace, failedService, err := o.criticalServiceEndpointsReady(ctx) if err != nil { lastFailure = err.Error() } else { lastFailure = detail } if ready { o.log.Printf("critical service endpoint checklist passed (%s)", detail) return nil } now := time.Now() if !o.runner.DryRun && failedNamespace != "" && failedService != "" && (lastHealAttempt.IsZero() || now.Sub(lastHealAttempt) >= 30*time.Second) { lastHealAttempt = now healed, healErr := o.maybeHealCriticalEndpointBackends(ctx, failedNamespace, failedService) if healErr != nil { o.log.Printf("warning: critical endpoint backend auto-heal failed for %s/%s: %v", failedNamespace, failedService, healErr) } if len(healed) > 0 { sort.Strings(healed) healDetail := fmt.Sprintf("restored critical endpoint backends: %s", joinLimited(healed, 8)) o.log.Printf("%s", healDetail) o.noteStartupAutoHeal(healDetail) } } if now.Sub(lastLogged) >= 30*time.Second { remaining := time.Until(deadline).Round(time.Second) if remaining < 0 { remaining = 0 } o.log.Printf("waiting for critical service endpoints (%s remaining): %s", remaining, lastFailure) lastLogged = now } if time.Now().After(deadline) { return fmt.Errorf("startup blocked: critical service endpoint checklist not satisfied within %s (%s)", wait, lastFailure) } select { case <-ctx.Done(): return ctx.Err() case <-time.After(poll): } } } // criticalServiceEndpointsReady runs one orchestration or CLI step. // Signature: (o *Orchestrator) criticalServiceEndpointsReady(ctx context.Context) (bool, string, string, string, error). // Why: startup should only declare success when critical services have active // backend addresses, not just rendered objects. func (o *Orchestrator) criticalServiceEndpointsReady(ctx context.Context) (bool, string, string, string, error) { entries := o.cfg.Startup.CriticalServiceEndpoints if len(entries) == 0 { return true, "no critical service endpoints configured", "", "", nil } for _, entry := range entries { namespace, service, err := parseCriticalServiceEndpoint(entry) if err != nil { return false, "", "", "", err } count, err := o.endpointAddressCount(ctx, namespace, service) if err != nil { if isNotFoundErr(err) { return false, fmt.Sprintf("%s/%s not found", namespace, service), namespace, service, nil } return false, "", namespace, service, fmt.Errorf("query endpoints %s/%s: %w", namespace, service, err) } if count <= 0 { return false, fmt.Sprintf("%s/%s endpoints=0", namespace, service), namespace, service, nil } } return true, fmt.Sprintf("services=%d", len(entries)), "", "", nil } // maybeHealCriticalEndpointBackends runs one orchestration or CLI step. // Signature: (o *Orchestrator) maybeHealCriticalEndpointBackends(ctx context.Context, namespace string, service string) ([]string, error). // Why: endpoint-ready gating should include controlled self-healing for workloads // that are configured to be available after startup. func (o *Orchestrator) maybeHealCriticalEndpointBackends(ctx context.Context, namespace string, service string) ([]string, error) { namespace = strings.TrimSpace(namespace) service = strings.TrimSpace(service) if namespace == "" || service == "" { return nil, nil } healed := []string{} for _, kind := range []string{"deployment", "statefulset"} { workload := startupWorkload{Namespace: namespace, Kind: kind, Name: service} if err := o.ensureWorkloadReplicas(ctx, workload, 1); err != nil { if isNotFoundErr(err) { continue } return healed, fmt.Errorf("scale %s/%s/%s to 1: %w", namespace, kind, service, err) } if err := o.waitWorkloadReady(ctx, workload); err != nil { if !isNotFoundErr(err) { return healed, err } continue } healed = append(healed, namespace+"/"+kind+"/"+service) } return healed, nil } // endpointAddressCount runs one orchestration or CLI step. // Signature: (o *Orchestrator) endpointAddressCount(ctx context.Context, namespace string, service string) (int, error). // Why: endpoint address counts provide an objective service-backend readiness // signal independent of ingress/controller Ready conditions. func (o *Orchestrator) endpointAddressCount(ctx context.Context, namespace string, service string) (int, error) { out, err := o.kubectl( ctx, 20*time.Second, "-n", namespace, "get", "endpoints", service, "-o", `jsonpath={range .subsets[*].addresses[*]}{.ip}{"\n"}{end}`, ) if err != nil { return 0, err } return len(lines(out)), nil } // parseCriticalServiceEndpoint runs one orchestration or CLI step. // Signature: parseCriticalServiceEndpoint(entry string) (string, string, error). // Why: endpoint config should be parsed consistently so validation and runtime // checks agree on namespace/service identity. func parseCriticalServiceEndpoint(entry string) (string, string, error) { entry = strings.TrimSpace(entry) parts := strings.SplitN(entry, "/", 2) if len(parts) != 2 { return "", "", fmt.Errorf("invalid startup.critical_service_endpoints entry %q: expected namespace/service", entry) } namespace := strings.TrimSpace(parts[0]) service := strings.TrimSpace(parts[1]) if namespace == "" || service == "" { return "", "", fmt.Errorf("invalid startup.critical_service_endpoints entry %q: namespace and service must be non-empty", entry) } return namespace, service, nil }