ananke/internal/cluster/orchestrator_service_stability.go

418 lines
14 KiB
Go

package cluster
import (
"context"
"crypto/tls"
"encoding/json"
"fmt"
"io"
"net/http"
"strings"
"time"
"unicode"
"scm.bstein.dev/bstein/ananke/internal/config"
)
// isLikelyHostname runs one orchestration or CLI step.
// Signature: isLikelyHostname(value string) bool.
// Why: keeps behavior explicit so startup/shutdown workflows remain maintainable as services evolve.
func isLikelyHostname(value string) bool {
value = strings.TrimSpace(value)
if value == "" {
return false
}
if strings.Contains(value, " ") || strings.Contains(value, "/") {
return false
}
return strings.Contains(value, ".")
}
// healIngressHostBackendReplicas runs one orchestration or CLI step.
// Signature: (o *Orchestrator) healIngressHostBackendReplicas(ctx context.Context, host string) ([]string, error).
// Why: keeps behavior explicit so startup/shutdown workflows remain maintainable as services evolve.
func (o *Orchestrator) healIngressHostBackendReplicas(ctx context.Context, host string) ([]string, error) {
namespaces, err := o.discoverIngressNamespacesForHost(ctx, host)
if err != nil {
return nil, err
}
if len(namespaces) == 0 {
return nil, nil
}
targetNamespaces := makeStringSet(namespaces)
out, err := o.kubectl(ctx, 30*time.Second, "get", "deploy,statefulset", "-A", "-o", "json")
if err != nil {
return nil, fmt.Errorf("query workloads: %w", err)
}
var list workloadList
if err := json.Unmarshal([]byte(out), &list); err != nil {
return nil, fmt.Errorf("decode workloads: %w", err)
}
healed := []string{}
for _, item := range list.Items {
kind := strings.ToLower(strings.TrimSpace(item.Kind))
ns := strings.TrimSpace(item.Metadata.Namespace)
name := strings.TrimSpace(item.Metadata.Name)
if kind == "" || ns == "" || name == "" {
continue
}
if kind != "deployment" && kind != "statefulset" {
continue
}
if _, ok := targetNamespaces[ns]; !ok {
continue
}
desired := int32(1)
if item.Spec.Replicas != nil {
desired = *item.Spec.Replicas
}
if desired >= 1 {
continue
}
workload := startupWorkload{Namespace: ns, Kind: kind, Name: name}
if err := o.ensureWorkloadReplicas(ctx, workload, 1); err != nil {
if isNotFoundErr(err) {
continue
}
return healed, fmt.Errorf("scale %s/%s/%s to 1: %w", ns, kind, name, err)
}
healed = append(healed, ns+"/"+kind+"/"+name)
}
return healed, nil
}
// waitForServiceChecklist runs one orchestration or CLI step.
// Signature: (o *Orchestrator) waitForServiceChecklist(ctx context.Context) error.
// Why: keeps behavior explicit so startup/shutdown workflows remain maintainable as services evolve.
func (o *Orchestrator) waitForServiceChecklist(ctx context.Context) error {
wait := time.Duration(o.cfg.Startup.ServiceChecklistWaitSeconds) * time.Second
if wait <= 0 {
wait = 7 * time.Minute
}
poll := time.Duration(o.cfg.Startup.ServiceChecklistPollSeconds) * time.Second
if poll <= 0 {
poll = 5 * time.Second
}
deadline := time.Now().Add(wait)
lastFailure := "unknown"
lastLogged := time.Time{}
lastRecycleAttempt := time.Time{}
lastReplicaHeal := time.Time{}
lastIngressHeal := time.Time{}
for {
o.maybeAutoRecycleStuckPods(ctx, &lastRecycleAttempt)
o.maybeAutoHealCriticalWorkloadReplicas(ctx, &lastReplicaHeal)
prevFailure := lastFailure
ready, detail := o.serviceChecklistReady(ctx)
lastFailure = detail
if ready {
o.log.Printf("external service checklist passed (%s)", detail)
return nil
}
o.maybeAutoHealIngressHostBackends(ctx, &lastIngressHeal, lastFailure)
if lastFailure != prevFailure || time.Since(lastLogged) >= 30*time.Second {
remaining := time.Until(deadline).Round(time.Second)
if remaining < 0 {
remaining = 0
}
o.log.Printf("waiting for external service checklist (%s remaining): %s", remaining, lastFailure)
lastLogged = time.Now()
}
if time.Now().After(deadline) {
return fmt.Errorf("startup blocked: external service checklist not satisfied within %s (%s)", wait, lastFailure)
}
select {
case <-ctx.Done():
return ctx.Err()
case <-time.After(poll):
}
}
}
// serviceChecklistReady runs one orchestration or CLI step.
// Signature: (o *Orchestrator) serviceChecklistReady(ctx context.Context) (bool, string).
// Why: keeps behavior explicit so startup/shutdown workflows remain maintainable as services evolve.
func (o *Orchestrator) serviceChecklistReady(ctx context.Context) (bool, string) {
checks := o.cfg.Startup.ServiceChecklist
if len(checks) == 0 {
return true, "no checklist items configured"
}
for _, check := range checks {
ok, detail := o.serviceCheckReady(ctx, check)
if !ok {
name := strings.TrimSpace(check.Name)
if name == "" {
name = strings.TrimSpace(check.URL)
}
return false, fmt.Sprintf("%s: %s", name, detail)
}
}
return true, fmt.Sprintf("checks=%d", len(checks))
}
// serviceCheckReady runs one orchestration or CLI step.
// Signature: (o *Orchestrator) serviceCheckReady(ctx context.Context, check config.ServiceChecklistCheck) (bool, string).
// Why: keeps behavior explicit so startup/shutdown workflows remain maintainable as services evolve.
func (o *Orchestrator) serviceCheckReady(ctx context.Context, check config.ServiceChecklistCheck) (bool, string) {
result, err := o.httpChecklistProbeResult(ctx, check)
if err != nil {
return false, err.Error()
}
accepted := check.AcceptedStatuses
if len(accepted) == 0 {
accepted = []int{200, 201, 202, 203, 204, 301, 302, 303, 307, 308, 401, 403}
}
statusOk := false
for _, code := range accepted {
if result.Status == code {
statusOk = true
break
}
}
if !statusOk {
return false, fmt.Sprintf("unexpected status code=%d", result.Status)
}
locationContains := strings.TrimSpace(check.LocationContains)
if locationContains != "" && !checklistContains(result.Location, locationContains) {
return false, fmt.Sprintf("location header missing expected marker %q", locationContains)
}
locationNotContains := strings.TrimSpace(check.LocationNotContains)
if locationNotContains != "" && checklistContains(result.Location, locationNotContains) {
return false, fmt.Sprintf("location header contained forbidden marker %q", locationNotContains)
}
finalURLContains := strings.TrimSpace(check.FinalURLContains)
if finalURLContains != "" && !checklistContains(result.FinalURL, finalURLContains) {
return false, fmt.Sprintf("final url missing expected marker %q", finalURLContains)
}
finalURLNotContains := strings.TrimSpace(check.FinalURLNotContains)
if finalURLNotContains != "" && checklistContains(result.FinalURL, finalURLNotContains) {
return false, fmt.Sprintf("final url contained forbidden marker %q", finalURLNotContains)
}
bodyContains := strings.TrimSpace(check.BodyContains)
if bodyContains != "" && !checklistContains(result.Body, bodyContains) {
return false, fmt.Sprintf("response missing expected marker %q", bodyContains)
}
bodyNotContains := strings.TrimSpace(check.BodyNotContains)
if bodyNotContains != "" && checklistContains(result.Body, bodyNotContains) {
return false, fmt.Sprintf("response contained forbidden marker %q", bodyNotContains)
}
return true, fmt.Sprintf("status=%d", result.Status)
}
type checklistHTTPProbeResult struct {
Status int
Body string
Location string
FinalURL string
}
// httpChecklistProbeResult runs one orchestration or CLI step.
// Signature: (o *Orchestrator) httpChecklistProbeResult(ctx context.Context, check config.ServiceChecklistCheck) (checklistHTTPProbeResult, error).
// Why: checklist checks need response headers (for redirect verification) in
// addition to status/body so startup can validate real user-facing behavior.
func (o *Orchestrator) httpChecklistProbeResult(ctx context.Context, check config.ServiceChecklistCheck) (checklistHTTPProbeResult, error) {
result := checklistHTTPProbeResult{}
status, body, location, finalURL, err := o.httpChecklistProbeWithLocation(ctx, check)
if err != nil {
return result, err
}
result.Status = status
result.Body = body
result.Location = location
result.FinalURL = finalURL
return result, nil
}
// httpChecklistProbe runs one orchestration or CLI step.
// Signature: (o *Orchestrator) httpChecklistProbe(ctx context.Context, check config.ServiceChecklistCheck) (int, string, error).
// Why: keeps behavior explicit so startup/shutdown workflows remain maintainable as services evolve.
func (o *Orchestrator) httpChecklistProbe(ctx context.Context, check config.ServiceChecklistCheck) (int, string, error) {
status, body, _, _, err := o.httpChecklistProbeWithLocation(ctx, check)
return status, body, err
}
// httpChecklistProbeWithLocation runs one orchestration or CLI step.
// Signature: (o *Orchestrator) httpChecklistProbeWithLocation(ctx context.Context, check config.ServiceChecklistCheck) (int, string, string, string, error).
// Why: redirects and auth gates require location-header assertions to prevent
// startup false-positives on partially healthy protected services.
func (o *Orchestrator) httpChecklistProbeWithLocation(ctx context.Context, check config.ServiceChecklistCheck) (int, string, string, string, error) {
timeout := time.Duration(check.TimeoutSeconds) * time.Second
if timeout <= 0 {
timeout = 12 * time.Second
}
followRedirects := check.FollowRedirects || check.RequireRobotAuth
var client *http.Client
if check.RequireRobotAuth {
authClient, authErr := o.checklistAuthHTTPClient(ctx, timeout, check.InsecureSkipTLS)
if authErr != nil {
return 0, "", "", "", fmt.Errorf("initialize robotuser checklist session: %w", authErr)
}
client = authClient
} else {
transport := &http.Transport{}
if check.InsecureSkipTLS {
transport.TLSClientConfig = &tls.Config{InsecureSkipVerify: true}
}
client = &http.Client{
Timeout: timeout,
Transport: transport,
}
}
if !followRedirects {
client.CheckRedirect = func(_ *http.Request, _ []*http.Request) error {
return http.ErrUseLastResponse
}
}
req, err := http.NewRequestWithContext(ctx, http.MethodGet, strings.TrimSpace(check.URL), nil)
if err != nil {
return 0, "", "", "", fmt.Errorf("build request: %w", err)
}
req.Header.Set("User-Agent", "ananke/startup-checklist")
resp, err := client.Do(req)
if err != nil {
return 0, "", "", "", fmt.Errorf("request failed: %w", err)
}
defer resp.Body.Close()
body, readErr := io.ReadAll(io.LimitReader(resp.Body, 64*1024))
if readErr != nil {
return resp.StatusCode, "", "", "", fmt.Errorf("read response body: %w", readErr)
}
finalURL := strings.TrimSpace(req.URL.String())
if resp.Request != nil && resp.Request.URL != nil {
finalURL = strings.TrimSpace(resp.Request.URL.String())
}
return resp.StatusCode, string(body), strings.TrimSpace(resp.Header.Get("Location")), finalURL, nil
}
// checklistContains runs one orchestration or CLI step.
// Signature: checklistContains(body, marker string) bool.
// Why: keeps behavior explicit so startup/shutdown workflows remain maintainable as services evolve.
func checklistContains(body, marker string) bool {
bodyLower := strings.ToLower(body)
markerLower := strings.ToLower(marker)
if strings.Contains(bodyLower, markerLower) {
return true
}
bodyCompact := compactLowerNoSpace(bodyLower)
markerCompact := compactLowerNoSpace(markerLower)
if markerCompact == "" {
return true
}
return strings.Contains(bodyCompact, markerCompact)
}
// compactLowerNoSpace runs one orchestration or CLI step.
// Signature: compactLowerNoSpace(s string) string.
// Why: keeps behavior explicit so startup/shutdown workflows remain maintainable as services evolve.
func compactLowerNoSpace(s string) string {
var b strings.Builder
b.Grow(len(s))
for _, r := range s {
if unicode.IsSpace(r) {
continue
}
b.WriteRune(r)
}
return b.String()
}
// waitForStabilityWindow runs one orchestration or CLI step.
// Signature: (o *Orchestrator) waitForStabilityWindow(ctx context.Context) error.
// Why: keeps behavior explicit so startup/shutdown workflows remain maintainable as services evolve.
func (o *Orchestrator) waitForStabilityWindow(ctx context.Context) error {
window := time.Duration(o.cfg.Startup.ServiceChecklistStabilitySec) * time.Second
if window <= 0 {
return nil
}
poll := time.Duration(o.cfg.Startup.ServiceChecklistPollSeconds) * time.Second
if poll <= 0 {
poll = 5 * time.Second
}
deadline := time.Now().Add(window)
lastStatus := time.Time{}
lastRecycleAttempt := time.Time{}
lastReplicaHeal := time.Time{}
for {
o.maybeAutoRecycleStuckPods(ctx, &lastRecycleAttempt)
o.maybeAutoHealCriticalWorkloadReplicas(ctx, &lastReplicaHeal)
if err := o.startupStabilityHealthy(ctx); err != nil {
return fmt.Errorf("startup stability window failed: %w", err)
}
if time.Now().After(deadline) {
o.log.Printf("startup stability window passed (%s)", window)
return nil
}
if time.Since(lastStatus) >= 30*time.Second {
remaining := time.Until(deadline).Round(time.Second)
if remaining < 0 {
remaining = 0
}
o.log.Printf("startup stability soak in progress (%s remaining)", remaining)
lastStatus = time.Now()
}
select {
case <-ctx.Done():
return ctx.Err()
case <-time.After(poll):
}
}
}
// startupStabilityHealthy runs one orchestration or CLI step.
// Signature: (o *Orchestrator) startupStabilityHealthy(ctx context.Context) error.
// Why: keeps behavior explicit so startup/shutdown workflows remain maintainable as services evolve.
func (o *Orchestrator) startupStabilityHealthy(ctx context.Context) error {
if o.cfg.Startup.RequireFluxHealth {
ready, detail, err := o.fluxHealthReady(ctx)
if err != nil {
return fmt.Errorf("flux check error: %w", err)
}
if !ready {
return fmt.Errorf("flux not ready: %s", detail)
}
}
if o.cfg.Startup.RequireWorkloadConvergence {
ready, detail, err := o.workloadConvergenceReady(ctx)
if err != nil {
return fmt.Errorf("workload check error: %w", err)
}
if !ready {
return fmt.Errorf("workloads not converged: %s", detail)
}
}
if o.cfg.Startup.RequireServiceChecklist {
ready, detail := o.serviceChecklistReady(ctx)
if !ready {
return fmt.Errorf("external services not healthy: %s", detail)
}
}
if o.cfg.Startup.RequireIngressChecklist {
ready, detail := o.ingressChecklistReady(ctx)
if !ready {
return fmt.Errorf("ingress reachability not healthy: %s", detail)
}
}
failures, err := o.startupFailurePods(ctx)
if err != nil {
return fmt.Errorf("pod failure check error: %w", err)
}
if len(failures) > 0 {
return fmt.Errorf("pods in crash/image-pull failures: %s", joinLimited(failures, 8))
}
return nil
}