metis/pkg/service/cluster.go

361 lines
10 KiB
Go

package service
import (
"bytes"
"crypto/tls"
"crypto/x509"
"encoding/json"
"fmt"
"io"
"net/http"
"net/url"
"os"
"sort"
"strings"
"time"
)
type clusterNode struct {
Name string
Arch string
Hardware string
Worker bool
ControlPlane bool
Unschedulable bool
}
type podState struct {
Name string
Phase string
Reason string
Message string
}
type kubeClient struct {
baseURL string
token string
client *http.Client
}
var kubeClientFactory = inClusterKubeClient
var (
kubeServiceAccountTokenPath = "/var/run/secrets/kubernetes.io/serviceaccount/token"
kubeServiceAccountCAPath = "/var/run/secrets/kubernetes.io/serviceaccount/ca.crt"
)
func inClusterKubeClient() (*kubeClient, error) {
host := strings.TrimSpace(os.Getenv("KUBERNETES_SERVICE_HOST"))
port := strings.TrimSpace(os.Getenv("KUBERNETES_SERVICE_PORT"))
if host == "" || port == "" {
return nil, fmt.Errorf("not running in cluster")
}
token, err := os.ReadFile(kubeServiceAccountTokenPath)
if err != nil {
return nil, err
}
caPEM, err := os.ReadFile(kubeServiceAccountCAPath)
if err != nil {
return nil, err
}
pool := x509.NewCertPool()
if !pool.AppendCertsFromPEM(caPEM) {
return nil, fmt.Errorf("append kubernetes CA")
}
return &kubeClient{
baseURL: fmt.Sprintf("https://%s:%s", host, port),
token: strings.TrimSpace(string(token)),
client: &http.Client{
Timeout: 30 * time.Second,
Transport: &http.Transport{
TLSClientConfig: &tls.Config{RootCAs: pool},
},
},
}, nil
}
func (k *kubeClient) jsonRequest(method, path string, body any, out any) error {
var reader io.Reader
if body != nil {
data, err := json.Marshal(body)
if err != nil {
return err
}
reader = bytes.NewReader(data)
}
req, err := http.NewRequest(method, k.baseURL+path, reader)
if err != nil {
return err
}
req.Header.Set("Authorization", "Bearer "+k.token)
if body != nil {
req.Header.Set("Content-Type", "application/json")
}
resp, err := k.client.Do(req)
if err != nil {
return err
}
defer resp.Body.Close()
if resp.StatusCode >= 300 {
payload, _ := io.ReadAll(io.LimitReader(resp.Body, 8192))
return fmt.Errorf("%s %s failed: %s: %s", method, path, resp.Status, strings.TrimSpace(string(payload)))
}
if out == nil {
return nil
}
return json.NewDecoder(io.LimitReader(resp.Body, 1<<20)).Decode(out)
}
func (k *kubeClient) deleteRequest(path string) error {
req, err := http.NewRequest(http.MethodDelete, k.baseURL+path, nil)
if err != nil {
return err
}
req.Header.Set("Authorization", "Bearer "+k.token)
resp, err := k.client.Do(req)
if err != nil {
return err
}
defer resp.Body.Close()
if resp.StatusCode == http.StatusNotFound || resp.StatusCode == http.StatusOK || resp.StatusCode == http.StatusAccepted {
return nil
}
payload, _ := io.ReadAll(io.LimitReader(resp.Body, 4096))
return fmt.Errorf("delete %s failed: %s: %s", path, resp.Status, strings.TrimSpace(string(payload)))
}
func clusterNodes() []clusterNode {
kube, err := kubeClientFactory()
if err != nil {
return nil
}
var payload struct {
Items []struct {
Metadata struct {
Name string `json:"name"`
Labels map[string]string `json:"labels"`
} `json:"metadata"`
Spec struct {
Unschedulable bool `json:"unschedulable"`
} `json:"spec"`
} `json:"items"`
}
if err := kube.jsonRequest(http.MethodGet, "/api/v1/nodes", nil, &payload); err != nil {
return nil
}
nodes := make([]clusterNode, 0, len(payload.Items))
for _, item := range payload.Items {
labels := item.Metadata.Labels
nodes = append(nodes, clusterNode{
Name: strings.TrimSpace(item.Metadata.Name),
Arch: strings.TrimSpace(labels["kubernetes.io/arch"]),
Hardware: strings.TrimSpace(labels["hardware"]),
Worker: labels["node-role.kubernetes.io/worker"] == "true",
ControlPlane: labels["node-role.kubernetes.io/control-plane"] != "" || labels["node-role.kubernetes.io/master"] != "",
Unschedulable: item.Spec.Unschedulable,
})
}
sort.Slice(nodes, func(i, j int) bool { return nodes[i].Name < nodes[j].Name })
return nodes
}
func clusterActiveRemotePodLoads(namespace, run string) map[string]int {
kube, err := kubeClientFactory()
if err != nil {
return nil
}
ns := url.PathEscape(strings.TrimSpace(namespace))
if ns == "" {
return nil
}
selector := "app=metis-remote"
if value := strings.TrimSpace(run); value != "" {
selector += ",metis-run=" + value
}
path := fmt.Sprintf("/api/v1/namespaces/%s/pods?labelSelector=%s", ns, url.QueryEscape(selector))
var payload struct {
Items []struct {
Metadata struct {
Labels map[string]string `json:"labels"`
} `json:"metadata"`
Spec struct {
NodeName string `json:"nodeName"`
} `json:"spec"`
Status struct {
Phase string `json:"phase"`
} `json:"status"`
} `json:"items"`
}
if err := kube.jsonRequest(http.MethodGet, path, nil, &payload); err != nil {
return nil
}
loads := map[string]int{}
for _, item := range payload.Items {
phase := strings.TrimSpace(item.Status.Phase)
if phase == "Succeeded" || phase == "Failed" {
continue
}
if value := strings.TrimSpace(run); value != "" && strings.TrimSpace(item.Metadata.Labels["metis-run"]) != value {
continue
}
nodeName := strings.TrimSpace(item.Spec.NodeName)
if nodeName == "" {
continue
}
loads[nodeName]++
}
return loads
}
func (a *App) podImageForArch(arch string) string {
switch strings.TrimSpace(arch) {
case "arm64":
return strings.TrimSpace(a.settings.RunnerImageARM64)
case "amd64":
return strings.TrimSpace(a.settings.RunnerImageAMD64)
default:
return ""
}
}
func (a *App) runRemotePod(jobID, podName string, podSpec map[string]any) (string, error) {
kube, err := kubeClientFactory()
if err != nil {
return "", err
}
ns := url.PathEscape(a.settings.Namespace)
_ = kube.deleteRequest(fmt.Sprintf("/api/v1/namespaces/%s/pods/%s", ns, url.PathEscape(podName)))
defer func() {
_ = kube.deleteRequest(fmt.Sprintf("/api/v1/namespaces/%s/pods/%s", ns, url.PathEscape(podName)))
}()
if err := kube.jsonRequest(http.MethodPost, fmt.Sprintf("/api/v1/namespaces/%s/pods", ns), podSpec, nil); err != nil {
return "", err
}
deadline := time.Now().Add(12 * time.Minute)
lastState := podState{Name: podName}
for time.Now().Before(deadline) {
state, err := a.remotePodState(kube, podName)
if err != nil {
return "", err
}
lastState = state
if strings.TrimSpace(jobID) != "" {
a.heartbeatRemoteJob(jobID)
}
switch state.Phase {
case "Succeeded":
if strings.TrimSpace(state.Message) != "" {
return strings.TrimSpace(state.Message), nil
}
logs, logErr := a.remotePodLogs(kube, podName)
if strings.TrimSpace(logs) != "" {
return strings.TrimSpace(logs), nil
}
if logErr != nil {
return "", fmt.Errorf("remote pod %s succeeded but did not return a result payload; logs unavailable: %v", podName, logErr)
}
return "", fmt.Errorf("remote pod %s succeeded but did not return a result payload", podName)
case "Failed":
if strings.TrimSpace(state.Message) != "" {
return "", fmt.Errorf("remote pod %s failed: %s", podName, strings.TrimSpace(state.Message))
}
if logs, logErr := a.remotePodLogs(kube, podName); logErr == nil && strings.TrimSpace(logs) != "" {
return "", fmt.Errorf("remote pod %s failed: %s", podName, strings.TrimSpace(logs))
}
reason := strings.TrimSpace(state.Reason)
if reason == "" {
reason = "remote worker failed before reporting details"
}
return "", fmt.Errorf("remote pod %s failed: %s", podName, reason)
}
time.Sleep(2 * time.Second)
}
if lastState.Phase != "" {
return "", fmt.Errorf("remote pod %s timed out in phase %s: %s %s", podName, lastState.Phase, strings.TrimSpace(lastState.Reason), strings.TrimSpace(lastState.Message))
}
return "", fmt.Errorf("remote pod %s timed out", podName)
}
func (a *App) remotePodState(kube *kubeClient, podName string) (podState, error) {
var payload struct {
Metadata struct {
Name string `json:"name"`
} `json:"metadata"`
Status struct {
Phase string `json:"phase"`
Reason string `json:"reason"`
Message string `json:"message"`
Conditions []struct {
Type string `json:"type"`
Status string `json:"status"`
Reason string `json:"reason"`
Message string `json:"message"`
} `json:"conditions"`
ContainerStatuses []struct {
State struct {
Waiting struct {
Reason string `json:"reason"`
Message string `json:"message"`
} `json:"waiting"`
Terminated struct {
Reason string `json:"reason"`
Message string `json:"message"`
} `json:"terminated"`
} `json:"state"`
} `json:"containerStatuses"`
} `json:"status"`
}
ns := url.PathEscape(a.settings.Namespace)
if err := kube.jsonRequest(http.MethodGet, fmt.Sprintf("/api/v1/namespaces/%s/pods/%s", ns, url.PathEscape(podName)), nil, &payload); err != nil {
return podState{}, err
}
out := podState{
Name: payload.Metadata.Name,
Phase: payload.Status.Phase,
Reason: payload.Status.Reason,
Message: payload.Status.Message,
}
if len(payload.Status.ContainerStatuses) > 0 {
waiting := payload.Status.ContainerStatuses[0].State.Waiting
terminated := payload.Status.ContainerStatuses[0].State.Terminated
if strings.TrimSpace(waiting.Reason) != "" {
out.Reason = waiting.Reason
out.Message = waiting.Message
}
if strings.TrimSpace(terminated.Reason) != "" {
out.Reason = terminated.Reason
}
if strings.TrimSpace(terminated.Message) != "" {
out.Message = terminated.Message
}
}
return out, nil
}
func (a *App) remotePodLogs(kube *kubeClient, podName string) (string, error) {
ns := url.PathEscape(a.settings.Namespace)
req, err := http.NewRequest(http.MethodGet, fmt.Sprintf("%s/api/v1/namespaces/%s/pods/%s/log", kube.baseURL, ns, url.PathEscape(podName)), nil)
if err != nil {
return "", err
}
req.Header.Set("Authorization", "Bearer "+kube.token)
resp, err := kube.client.Do(req)
if err != nil {
return "", err
}
defer resp.Body.Close()
if resp.StatusCode >= 300 {
body, _ := io.ReadAll(io.LimitReader(resp.Body, 4096))
message := strings.TrimSpace(string(body))
if strings.Contains(message, "proxy error from 127.0.0.1:6443") || strings.Contains(message, "containerLogs") {
return "", fmt.Errorf("pod logs %s failed because Kubernetes could not reach the node kubelet log endpoint: %s", podName, message)
}
return "", fmt.Errorf("pod logs %s failed: %s: %s", podName, resp.Status, message)
}
body, err := io.ReadAll(io.LimitReader(resp.Body, 1<<20))
if err != nil {
return "", err
}
return string(body), nil
}