autoheal: repair broken kubelet proxies
This commit is contained in:
parent
0cbd9127d9
commit
0b4b05233e
@ -14,6 +14,9 @@ type nodeReadyList struct {
|
|||||||
Metadata struct {
|
Metadata struct {
|
||||||
Name string `json:"name"`
|
Name string `json:"name"`
|
||||||
} `json:"metadata"`
|
} `json:"metadata"`
|
||||||
|
Spec struct {
|
||||||
|
Unschedulable bool `json:"unschedulable"`
|
||||||
|
} `json:"spec"`
|
||||||
Status struct {
|
Status struct {
|
||||||
Conditions []struct {
|
Conditions []struct {
|
||||||
Type string `json:"type"`
|
Type string `json:"type"`
|
||||||
@ -23,6 +26,11 @@ type nodeReadyList struct {
|
|||||||
} `json:"items"`
|
} `json:"items"`
|
||||||
}
|
}
|
||||||
|
|
||||||
|
type readyNodeCandidate struct {
|
||||||
|
Name string
|
||||||
|
Unschedulable bool
|
||||||
|
}
|
||||||
|
|
||||||
type podDeleteList struct {
|
type podDeleteList struct {
|
||||||
Items []struct {
|
Items []struct {
|
||||||
Metadata struct {
|
Metadata struct {
|
||||||
@ -77,6 +85,13 @@ func (o *Orchestrator) postStartAutoHeal(ctx context.Context) error {
|
|||||||
requestReconcile = true
|
requestReconcile = true
|
||||||
}
|
}
|
||||||
|
|
||||||
|
repairedProxies, err := o.repairBrokenKubeletProxies(ctx)
|
||||||
|
if err != nil {
|
||||||
|
errs = append(errs, fmt.Sprintf("kubelet proxy auto-repair: %v", err))
|
||||||
|
} else if repairedProxies > 0 {
|
||||||
|
o.log.Printf("post-start auto-heal repaired %d broken kubelet proxy node(s)", repairedProxies)
|
||||||
|
}
|
||||||
|
|
||||||
if requestReconcile {
|
if requestReconcile {
|
||||||
o.bestEffort("request flux reconcile after post-start auto-heal", func() error {
|
o.bestEffort("request flux reconcile after post-start auto-heal", func() error {
|
||||||
return o.requestFluxReconcile(ctx)
|
return o.requestFluxReconcile(ctx)
|
||||||
@ -206,6 +221,154 @@ func (o *Orchestrator) cleanupTerminatingPodsOnUnavailableNodes(ctx context.Cont
|
|||||||
return count, nil
|
return count, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// repairBrokenKubeletProxies runs one orchestration or CLI step.
|
||||||
|
// Signature: (o *Orchestrator) repairBrokenKubeletProxies(ctx context.Context) (int, error).
|
||||||
|
// Why: a Ready node can still have a dead kubelet tunnel, which breaks Jenkins
|
||||||
|
// exec/websocket agents until the k3s agent is restarted on that exact node.
|
||||||
|
func (o *Orchestrator) repairBrokenKubeletProxies(ctx context.Context) (int, error) {
|
||||||
|
if o.runner.DryRun {
|
||||||
|
return 0, nil
|
||||||
|
}
|
||||||
|
|
||||||
|
nodes, err := o.readyNodeCandidates(ctx)
|
||||||
|
if err != nil {
|
||||||
|
return 0, err
|
||||||
|
}
|
||||||
|
|
||||||
|
repaired := 0
|
||||||
|
errs := []string{}
|
||||||
|
for _, node := range nodes {
|
||||||
|
healthy, checkErr := o.kubeletProxyHealthy(ctx, node.Name)
|
||||||
|
if healthy {
|
||||||
|
continue
|
||||||
|
}
|
||||||
|
if checkErr != nil && !isRepairableKubeletProxyErr(checkErr) {
|
||||||
|
errs = append(errs, fmt.Sprintf("%s proxy health check: %v", node.Name, checkErr))
|
||||||
|
continue
|
||||||
|
}
|
||||||
|
if !o.sshManaged(node.Name) {
|
||||||
|
errs = append(errs, fmt.Sprintf("%s proxy broken but node is not SSH-managed", node.Name))
|
||||||
|
continue
|
||||||
|
}
|
||||||
|
|
||||||
|
if !node.Unschedulable {
|
||||||
|
if _, err := o.kubectl(ctx, 20*time.Second, "cordon", node.Name); err != nil {
|
||||||
|
errs = append(errs, fmt.Sprintf("%s cordon before kubelet restart: %v", node.Name, err))
|
||||||
|
continue
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
o.log.Printf("warning: detected broken kubelet proxy on Ready node %s; restarting k3s-agent", node.Name)
|
||||||
|
if _, err := o.sshWithTimeout(ctx, node.Name, "sudo -n systemctl restart k3s-agent", 90*time.Second); err != nil {
|
||||||
|
if !node.Unschedulable {
|
||||||
|
o.bestEffort("uncordon node after failed kubelet proxy repair", func() error {
|
||||||
|
_, uncordonErr := o.kubectl(ctx, 20*time.Second, "uncordon", node.Name)
|
||||||
|
return uncordonErr
|
||||||
|
})
|
||||||
|
}
|
||||||
|
errs = append(errs, fmt.Sprintf("%s restart k3s-agent: %v", node.Name, err))
|
||||||
|
continue
|
||||||
|
}
|
||||||
|
if _, err := o.kubectl(ctx, 140*time.Second, "wait", "node/"+node.Name, "--for=condition=Ready", "--timeout=120s"); err != nil {
|
||||||
|
errs = append(errs, fmt.Sprintf("%s wait Ready after k3s-agent restart: %v", node.Name, err))
|
||||||
|
continue
|
||||||
|
}
|
||||||
|
healthy, checkErr = o.kubeletProxyHealthy(ctx, node.Name)
|
||||||
|
if !healthy {
|
||||||
|
if checkErr != nil {
|
||||||
|
errs = append(errs, fmt.Sprintf("%s proxy still broken after k3s-agent restart: %v", node.Name, checkErr))
|
||||||
|
} else {
|
||||||
|
errs = append(errs, fmt.Sprintf("%s proxy still broken after k3s-agent restart", node.Name))
|
||||||
|
}
|
||||||
|
continue
|
||||||
|
}
|
||||||
|
if !node.Unschedulable {
|
||||||
|
if _, err := o.kubectl(ctx, 20*time.Second, "uncordon", node.Name); err != nil {
|
||||||
|
errs = append(errs, fmt.Sprintf("%s uncordon after kubelet proxy repair: %v", node.Name, err))
|
||||||
|
continue
|
||||||
|
}
|
||||||
|
}
|
||||||
|
repaired++
|
||||||
|
}
|
||||||
|
|
||||||
|
if len(errs) > 0 {
|
||||||
|
return repaired, errors.New(strings.Join(errs, "; "))
|
||||||
|
}
|
||||||
|
return repaired, nil
|
||||||
|
}
|
||||||
|
|
||||||
|
// readyNodeCandidates runs one orchestration or CLI step.
|
||||||
|
// Signature: (o *Orchestrator) readyNodeCandidates(ctx context.Context) ([]readyNodeCandidate, error).
|
||||||
|
// Why: kubelet proxy repair should only touch nodes Kubernetes says are Ready,
|
||||||
|
// preserving existing cordons when a node was intentionally kept out of service.
|
||||||
|
func (o *Orchestrator) readyNodeCandidates(ctx context.Context) ([]readyNodeCandidate, error) {
|
||||||
|
out, err := o.kubectl(ctx, 20*time.Second, "get", "nodes", "-o", "json")
|
||||||
|
if err != nil {
|
||||||
|
return nil, fmt.Errorf("query nodes: %w", err)
|
||||||
|
}
|
||||||
|
var nodes nodeReadyList
|
||||||
|
if err := json.Unmarshal([]byte(out), &nodes); err != nil {
|
||||||
|
return nil, fmt.Errorf("decode nodes: %w", err)
|
||||||
|
}
|
||||||
|
|
||||||
|
readyNodes := []readyNodeCandidate{}
|
||||||
|
for _, item := range nodes.Items {
|
||||||
|
ready := ""
|
||||||
|
for _, cond := range item.Status.Conditions {
|
||||||
|
if strings.EqualFold(strings.TrimSpace(cond.Type), "Ready") {
|
||||||
|
ready = strings.TrimSpace(cond.Status)
|
||||||
|
break
|
||||||
|
}
|
||||||
|
}
|
||||||
|
if ready == "True" && item.Metadata.Name != "" {
|
||||||
|
readyNodes = append(readyNodes, readyNodeCandidate{
|
||||||
|
Name: item.Metadata.Name,
|
||||||
|
Unschedulable: item.Spec.Unschedulable,
|
||||||
|
})
|
||||||
|
}
|
||||||
|
}
|
||||||
|
return readyNodes, nil
|
||||||
|
}
|
||||||
|
|
||||||
|
// kubeletProxyHealthy runs one orchestration or CLI step.
|
||||||
|
// Signature: (o *Orchestrator) kubeletProxyHealthy(ctx context.Context, node string) (bool, error).
|
||||||
|
// Why: the apiserver node proxy is the path Jenkins uses for pod exec; checking
|
||||||
|
// it catches Ready-but-unusable nodes before agents start failing websockets.
|
||||||
|
func (o *Orchestrator) kubeletProxyHealthy(ctx context.Context, node string) (bool, error) {
|
||||||
|
out, err := o.kubectl(ctx, 10*time.Second, "get", "--raw", fmt.Sprintf("/api/v1/nodes/%s/proxy/healthz", node))
|
||||||
|
if err != nil {
|
||||||
|
if strings.TrimSpace(out) != "" {
|
||||||
|
return false, fmt.Errorf("%w: %s", err, strings.TrimSpace(out))
|
||||||
|
}
|
||||||
|
return false, err
|
||||||
|
}
|
||||||
|
return true, nil
|
||||||
|
}
|
||||||
|
|
||||||
|
// isRepairableKubeletProxyErr runs one orchestration or CLI step.
|
||||||
|
// Signature: isRepairableKubeletProxyErr(err error) bool.
|
||||||
|
// Why: keep this repair narrow so Ananke restarts k3s-agent for the known
|
||||||
|
// kubelet-tunnel failure, not for every transient kubectl problem.
|
||||||
|
func isRepairableKubeletProxyErr(err error) bool {
|
||||||
|
if err == nil {
|
||||||
|
return false
|
||||||
|
}
|
||||||
|
msg := strings.ToLower(err.Error())
|
||||||
|
repairable := []string{
|
||||||
|
"502",
|
||||||
|
"bad gateway",
|
||||||
|
"failed to find session",
|
||||||
|
"error trying to reach service",
|
||||||
|
"10250",
|
||||||
|
}
|
||||||
|
for _, needle := range repairable {
|
||||||
|
if strings.Contains(msg, needle) {
|
||||||
|
return true
|
||||||
|
}
|
||||||
|
}
|
||||||
|
return false
|
||||||
|
}
|
||||||
|
|
||||||
// unavailableNodeSet runs one orchestration or CLI step.
|
// unavailableNodeSet runs one orchestration or CLI step.
|
||||||
// Signature: (o *Orchestrator) unavailableNodeSet(ctx context.Context) (map[string]struct{}, error).
|
// Signature: (o *Orchestrator) unavailableNodeSet(ctx context.Context) (map[string]struct{}, error).
|
||||||
// Why: isolates Ready-condition parsing so dead-node cleanup stays targeted.
|
// Why: isolates Ready-condition parsing so dead-node cleanup stays targeted.
|
||||||
|
|||||||
@ -190,6 +190,121 @@ func TestRunPostStartAutoHealDryRun(t *testing.T) {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// TestRepairBrokenKubeletProxiesRestartsSchedulableNode runs one orchestration or CLI step.
|
||||||
|
// Signature: TestRepairBrokenKubeletProxiesRestartsSchedulableNode(t *testing.T).
|
||||||
|
// Why: Jenkins agents depend on apiserver-to-kubelet exec; a Ready node with a
|
||||||
|
// broken proxy needs a narrow k3s-agent restart, not a broad cluster restart.
|
||||||
|
func TestRepairBrokenKubeletProxiesRestartsSchedulableNode(t *testing.T) {
|
||||||
|
cfg := config.Config{SSHManagedNodes: []string{"titan-07"}}
|
||||||
|
orch := buildOrchestratorWithStubs(t, cfg, nil)
|
||||||
|
|
||||||
|
healthChecks := 0
|
||||||
|
cordoned := false
|
||||||
|
restarted := false
|
||||||
|
waited := false
|
||||||
|
uncordoned := false
|
||||||
|
dispatch := func(_ context.Context, _ time.Duration, name string, args ...string) (string, error) {
|
||||||
|
joined := strings.Join(args, " ")
|
||||||
|
switch {
|
||||||
|
case name == "kubectl" && strings.Contains(joined, "get nodes -o json"):
|
||||||
|
return `{"items":[{"metadata":{"name":"titan-07"},"spec":{"unschedulable":false},"status":{"conditions":[{"type":"Ready","status":"True"}]}},{"metadata":{"name":"titan-18"},"status":{"conditions":[{"type":"Ready","status":"Unknown"}]}}]}`, nil
|
||||||
|
case name == "kubectl" && strings.Contains(joined, "get --raw /api/v1/nodes/titan-07/proxy/healthz"):
|
||||||
|
healthChecks++
|
||||||
|
if healthChecks == 1 {
|
||||||
|
return "proxy error from 127.0.0.1:6443 while dialing 192.168.22.33:10250, code 502: 502 Bad Gateway", errors.New("exit status 1")
|
||||||
|
}
|
||||||
|
return "ok", nil
|
||||||
|
case name == "kubectl" && strings.Contains(joined, "uncordon titan-07"):
|
||||||
|
uncordoned = true
|
||||||
|
return "", nil
|
||||||
|
case name == "kubectl" && strings.Contains(joined, "cordon titan-07"):
|
||||||
|
cordoned = true
|
||||||
|
return "", nil
|
||||||
|
case name == "ssh" && strings.Contains(joined, "sudo -n systemctl restart k3s-agent"):
|
||||||
|
restarted = true
|
||||||
|
return "", nil
|
||||||
|
case name == "kubectl" && strings.Contains(joined, "wait node/titan-07 --for=condition=Ready --timeout=120s"):
|
||||||
|
waited = true
|
||||||
|
return "", nil
|
||||||
|
default:
|
||||||
|
return "", nil
|
||||||
|
}
|
||||||
|
}
|
||||||
|
orch.SetCommandOverrides(dispatch, dispatch)
|
||||||
|
|
||||||
|
repaired, err := orch.repairBrokenKubeletProxies(context.Background())
|
||||||
|
if err != nil {
|
||||||
|
t.Fatalf("repairBrokenKubeletProxies failed: %v", err)
|
||||||
|
}
|
||||||
|
if repaired != 1 {
|
||||||
|
t.Fatalf("expected one repaired node, got %d", repaired)
|
||||||
|
}
|
||||||
|
for name, ok := range map[string]bool{
|
||||||
|
"cordoned": cordoned,
|
||||||
|
"restarted": restarted,
|
||||||
|
"waited": waited,
|
||||||
|
"uncordoned": uncordoned,
|
||||||
|
} {
|
||||||
|
if !ok {
|
||||||
|
t.Fatalf("expected %s action", name)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
if healthChecks != 2 {
|
||||||
|
t.Fatalf("expected health check before and after repair, got %d", healthChecks)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// TestRepairBrokenKubeletProxiesPreservesExistingCordon runs one orchestration or CLI step.
|
||||||
|
// Signature: TestRepairBrokenKubeletProxiesPreservesExistingCordon(t *testing.T).
|
||||||
|
// Why: nodes intentionally kept out of service must not be accidentally
|
||||||
|
// uncordoned just because Ananke repaired their kubelet proxy.
|
||||||
|
func TestRepairBrokenKubeletProxiesPreservesExistingCordon(t *testing.T) {
|
||||||
|
cfg := config.Config{SSHManagedNodes: []string{"titan-18"}}
|
||||||
|
orch := buildOrchestratorWithStubs(t, cfg, nil)
|
||||||
|
|
||||||
|
healthChecks := 0
|
||||||
|
cordonTouched := false
|
||||||
|
restarted := false
|
||||||
|
dispatch := func(_ context.Context, _ time.Duration, name string, args ...string) (string, error) {
|
||||||
|
joined := strings.Join(args, " ")
|
||||||
|
switch {
|
||||||
|
case name == "kubectl" && strings.Contains(joined, "get nodes -o json"):
|
||||||
|
return `{"items":[{"metadata":{"name":"titan-18"},"spec":{"unschedulable":true},"status":{"conditions":[{"type":"Ready","status":"True"}]}}]}`, nil
|
||||||
|
case name == "kubectl" && strings.Contains(joined, "get --raw /api/v1/nodes/titan-18/proxy/healthz"):
|
||||||
|
healthChecks++
|
||||||
|
if healthChecks == 1 {
|
||||||
|
return "error trying to reach service: proxy error from 127.0.0.1:6443 while dialing 192.168.22.46:10250", errors.New("exit status 1")
|
||||||
|
}
|
||||||
|
return "ok", nil
|
||||||
|
case name == "kubectl" && (strings.Contains(joined, "cordon titan-18") || strings.Contains(joined, "uncordon titan-18")):
|
||||||
|
cordonTouched = true
|
||||||
|
return "", nil
|
||||||
|
case name == "ssh" && strings.Contains(joined, "sudo -n systemctl restart k3s-agent"):
|
||||||
|
restarted = true
|
||||||
|
return "", nil
|
||||||
|
case name == "kubectl" && strings.Contains(joined, "wait node/titan-18 --for=condition=Ready --timeout=120s"):
|
||||||
|
return "", nil
|
||||||
|
default:
|
||||||
|
return "", nil
|
||||||
|
}
|
||||||
|
}
|
||||||
|
orch.SetCommandOverrides(dispatch, dispatch)
|
||||||
|
|
||||||
|
repaired, err := orch.repairBrokenKubeletProxies(context.Background())
|
||||||
|
if err != nil {
|
||||||
|
t.Fatalf("repairBrokenKubeletProxies failed: %v", err)
|
||||||
|
}
|
||||||
|
if repaired != 1 {
|
||||||
|
t.Fatalf("expected one repaired node, got %d", repaired)
|
||||||
|
}
|
||||||
|
if !restarted {
|
||||||
|
t.Fatalf("expected k3s-agent restart")
|
||||||
|
}
|
||||||
|
if cordonTouched {
|
||||||
|
t.Fatalf("did not expect cordon state to change for already-unschedulable node")
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
// TestPostStartAutoHealAggregatesErrors runs one orchestration or CLI step.
|
// TestPostStartAutoHealAggregatesErrors runs one orchestration or CLI step.
|
||||||
// Signature: TestPostStartAutoHealAggregatesErrors(t *testing.T).
|
// Signature: TestPostStartAutoHealAggregatesErrors(t *testing.T).
|
||||||
// Why: proves the daemon reports each failed sub-repair together instead of
|
// Why: proves the daemon reports each failed sub-repair together instead of
|
||||||
|
|||||||
Loading…
x
Reference in New Issue
Block a user