307 lines
9.1 KiB
Go
307 lines
9.1 KiB
Go
package service
|
|
|
|
import (
|
|
"bytes"
|
|
"crypto/tls"
|
|
"crypto/x509"
|
|
"encoding/json"
|
|
"fmt"
|
|
"io"
|
|
"net/http"
|
|
"net/url"
|
|
"os"
|
|
"sort"
|
|
"strings"
|
|
"time"
|
|
)
|
|
|
|
type clusterNode struct {
|
|
Name string
|
|
Arch string
|
|
Hardware string
|
|
Worker bool
|
|
ControlPlane bool
|
|
Unschedulable bool
|
|
}
|
|
|
|
type podState struct {
|
|
Name string
|
|
Phase string
|
|
Reason string
|
|
Message string
|
|
}
|
|
|
|
type kubeClient struct {
|
|
baseURL string
|
|
token string
|
|
client *http.Client
|
|
}
|
|
|
|
func inClusterKubeClient() (*kubeClient, error) {
|
|
host := strings.TrimSpace(os.Getenv("KUBERNETES_SERVICE_HOST"))
|
|
port := strings.TrimSpace(os.Getenv("KUBERNETES_SERVICE_PORT"))
|
|
if host == "" || port == "" {
|
|
return nil, fmt.Errorf("not running in cluster")
|
|
}
|
|
token, err := os.ReadFile("/var/run/secrets/kubernetes.io/serviceaccount/token")
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
caPEM, err := os.ReadFile("/var/run/secrets/kubernetes.io/serviceaccount/ca.crt")
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
pool := x509.NewCertPool()
|
|
if !pool.AppendCertsFromPEM(caPEM) {
|
|
return nil, fmt.Errorf("append kubernetes CA")
|
|
}
|
|
return &kubeClient{
|
|
baseURL: fmt.Sprintf("https://%s:%s", host, port),
|
|
token: strings.TrimSpace(string(token)),
|
|
client: &http.Client{
|
|
Timeout: 30 * time.Second,
|
|
Transport: &http.Transport{
|
|
TLSClientConfig: &tls.Config{RootCAs: pool},
|
|
},
|
|
},
|
|
}, nil
|
|
}
|
|
|
|
func (k *kubeClient) jsonRequest(method, path string, body any, out any) error {
|
|
var reader io.Reader
|
|
if body != nil {
|
|
data, err := json.Marshal(body)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
reader = bytes.NewReader(data)
|
|
}
|
|
req, err := http.NewRequest(method, k.baseURL+path, reader)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
req.Header.Set("Authorization", "Bearer "+k.token)
|
|
if body != nil {
|
|
req.Header.Set("Content-Type", "application/json")
|
|
}
|
|
resp, err := k.client.Do(req)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
defer resp.Body.Close()
|
|
if resp.StatusCode >= 300 {
|
|
payload, _ := io.ReadAll(io.LimitReader(resp.Body, 8192))
|
|
return fmt.Errorf("%s %s failed: %s: %s", method, path, resp.Status, strings.TrimSpace(string(payload)))
|
|
}
|
|
if out == nil {
|
|
return nil
|
|
}
|
|
return json.NewDecoder(io.LimitReader(resp.Body, 1<<20)).Decode(out)
|
|
}
|
|
|
|
func (k *kubeClient) deleteRequest(path string) error {
|
|
req, err := http.NewRequest(http.MethodDelete, k.baseURL+path, nil)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
req.Header.Set("Authorization", "Bearer "+k.token)
|
|
resp, err := k.client.Do(req)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
defer resp.Body.Close()
|
|
if resp.StatusCode == http.StatusNotFound || resp.StatusCode == http.StatusOK || resp.StatusCode == http.StatusAccepted {
|
|
return nil
|
|
}
|
|
payload, _ := io.ReadAll(io.LimitReader(resp.Body, 4096))
|
|
return fmt.Errorf("delete %s failed: %s: %s", path, resp.Status, strings.TrimSpace(string(payload)))
|
|
}
|
|
|
|
func clusterNodes() []clusterNode {
|
|
kube, err := inClusterKubeClient()
|
|
if err != nil {
|
|
return nil
|
|
}
|
|
var payload struct {
|
|
Items []struct {
|
|
Metadata struct {
|
|
Name string `json:"name"`
|
|
Labels map[string]string `json:"labels"`
|
|
} `json:"metadata"`
|
|
Spec struct {
|
|
Unschedulable bool `json:"unschedulable"`
|
|
} `json:"spec"`
|
|
} `json:"items"`
|
|
}
|
|
if err := kube.jsonRequest(http.MethodGet, "/api/v1/nodes", nil, &payload); err != nil {
|
|
return nil
|
|
}
|
|
nodes := make([]clusterNode, 0, len(payload.Items))
|
|
for _, item := range payload.Items {
|
|
labels := item.Metadata.Labels
|
|
nodes = append(nodes, clusterNode{
|
|
Name: strings.TrimSpace(item.Metadata.Name),
|
|
Arch: strings.TrimSpace(labels["kubernetes.io/arch"]),
|
|
Hardware: strings.TrimSpace(labels["hardware"]),
|
|
Worker: labels["node-role.kubernetes.io/worker"] == "true",
|
|
ControlPlane: labels["node-role.kubernetes.io/control-plane"] != "" || labels["node-role.kubernetes.io/master"] != "",
|
|
Unschedulable: item.Spec.Unschedulable,
|
|
})
|
|
}
|
|
sort.Slice(nodes, func(i, j int) bool { return nodes[i].Name < nodes[j].Name })
|
|
return nodes
|
|
}
|
|
|
|
func (a *App) podImageForArch(arch string) string {
|
|
switch strings.TrimSpace(arch) {
|
|
case "arm64":
|
|
return strings.TrimSpace(a.settings.RunnerImageARM64)
|
|
case "amd64":
|
|
return strings.TrimSpace(a.settings.RunnerImageAMD64)
|
|
default:
|
|
return ""
|
|
}
|
|
}
|
|
|
|
func (a *App) runRemotePod(jobID, podName string, podSpec map[string]any) (string, error) {
|
|
kube, err := inClusterKubeClient()
|
|
if err != nil {
|
|
return "", err
|
|
}
|
|
ns := url.PathEscape(a.settings.Namespace)
|
|
_ = kube.deleteRequest(fmt.Sprintf("/api/v1/namespaces/%s/pods/%s", ns, url.PathEscape(podName)))
|
|
defer func() {
|
|
_ = kube.deleteRequest(fmt.Sprintf("/api/v1/namespaces/%s/pods/%s", ns, url.PathEscape(podName)))
|
|
}()
|
|
if err := kube.jsonRequest(http.MethodPost, fmt.Sprintf("/api/v1/namespaces/%s/pods", ns), podSpec, nil); err != nil {
|
|
return "", err
|
|
}
|
|
|
|
deadline := time.Now().Add(12 * time.Minute)
|
|
lastState := podState{Name: podName}
|
|
for time.Now().Before(deadline) {
|
|
state, err := a.remotePodState(kube, podName)
|
|
if err != nil {
|
|
return "", err
|
|
}
|
|
lastState = state
|
|
if strings.TrimSpace(jobID) != "" {
|
|
a.heartbeatRemoteJob(jobID)
|
|
}
|
|
switch state.Phase {
|
|
case "Succeeded":
|
|
if strings.TrimSpace(state.Message) != "" {
|
|
return strings.TrimSpace(state.Message), nil
|
|
}
|
|
logs, logErr := a.remotePodLogs(kube, podName)
|
|
if strings.TrimSpace(logs) != "" {
|
|
return strings.TrimSpace(logs), nil
|
|
}
|
|
if logErr != nil {
|
|
return "", fmt.Errorf("remote pod %s succeeded but did not return a result payload; logs unavailable: %v", podName, logErr)
|
|
}
|
|
return "", fmt.Errorf("remote pod %s succeeded but did not return a result payload", podName)
|
|
case "Failed":
|
|
if strings.TrimSpace(state.Message) != "" {
|
|
return "", fmt.Errorf("remote pod %s failed: %s", podName, strings.TrimSpace(state.Message))
|
|
}
|
|
if logs, logErr := a.remotePodLogs(kube, podName); logErr == nil && strings.TrimSpace(logs) != "" {
|
|
return "", fmt.Errorf("remote pod %s failed: %s", podName, strings.TrimSpace(logs))
|
|
}
|
|
reason := strings.TrimSpace(state.Reason)
|
|
if reason == "" {
|
|
reason = "remote worker failed before reporting details"
|
|
}
|
|
return "", fmt.Errorf("remote pod %s failed: %s", podName, reason)
|
|
}
|
|
time.Sleep(2 * time.Second)
|
|
}
|
|
if lastState.Phase != "" {
|
|
return "", fmt.Errorf("remote pod %s timed out in phase %s: %s %s", podName, lastState.Phase, strings.TrimSpace(lastState.Reason), strings.TrimSpace(lastState.Message))
|
|
}
|
|
return "", fmt.Errorf("remote pod %s timed out", podName)
|
|
}
|
|
|
|
func (a *App) remotePodState(kube *kubeClient, podName string) (podState, error) {
|
|
var payload struct {
|
|
Metadata struct {
|
|
Name string `json:"name"`
|
|
} `json:"metadata"`
|
|
Status struct {
|
|
Phase string `json:"phase"`
|
|
Reason string `json:"reason"`
|
|
Message string `json:"message"`
|
|
Conditions []struct {
|
|
Type string `json:"type"`
|
|
Status string `json:"status"`
|
|
Reason string `json:"reason"`
|
|
Message string `json:"message"`
|
|
} `json:"conditions"`
|
|
ContainerStatuses []struct {
|
|
State struct {
|
|
Waiting struct {
|
|
Reason string `json:"reason"`
|
|
Message string `json:"message"`
|
|
} `json:"waiting"`
|
|
Terminated struct {
|
|
Reason string `json:"reason"`
|
|
Message string `json:"message"`
|
|
} `json:"terminated"`
|
|
} `json:"state"`
|
|
} `json:"containerStatuses"`
|
|
} `json:"status"`
|
|
}
|
|
ns := url.PathEscape(a.settings.Namespace)
|
|
if err := kube.jsonRequest(http.MethodGet, fmt.Sprintf("/api/v1/namespaces/%s/pods/%s", ns, url.PathEscape(podName)), nil, &payload); err != nil {
|
|
return podState{}, err
|
|
}
|
|
out := podState{
|
|
Name: payload.Metadata.Name,
|
|
Phase: payload.Status.Phase,
|
|
Reason: payload.Status.Reason,
|
|
Message: payload.Status.Message,
|
|
}
|
|
if len(payload.Status.ContainerStatuses) > 0 {
|
|
waiting := payload.Status.ContainerStatuses[0].State.Waiting
|
|
terminated := payload.Status.ContainerStatuses[0].State.Terminated
|
|
if strings.TrimSpace(waiting.Reason) != "" {
|
|
out.Reason = waiting.Reason
|
|
out.Message = waiting.Message
|
|
}
|
|
if strings.TrimSpace(terminated.Reason) != "" {
|
|
out.Reason = terminated.Reason
|
|
}
|
|
if strings.TrimSpace(terminated.Message) != "" {
|
|
out.Message = terminated.Message
|
|
}
|
|
}
|
|
return out, nil
|
|
}
|
|
|
|
func (a *App) remotePodLogs(kube *kubeClient, podName string) (string, error) {
|
|
ns := url.PathEscape(a.settings.Namespace)
|
|
req, err := http.NewRequest(http.MethodGet, fmt.Sprintf("%s/api/v1/namespaces/%s/pods/%s/log", kube.baseURL, ns, url.PathEscape(podName)), nil)
|
|
if err != nil {
|
|
return "", err
|
|
}
|
|
req.Header.Set("Authorization", "Bearer "+kube.token)
|
|
resp, err := kube.client.Do(req)
|
|
if err != nil {
|
|
return "", err
|
|
}
|
|
defer resp.Body.Close()
|
|
if resp.StatusCode >= 300 {
|
|
body, _ := io.ReadAll(io.LimitReader(resp.Body, 4096))
|
|
message := strings.TrimSpace(string(body))
|
|
if strings.Contains(message, "proxy error from 127.0.0.1:6443") || strings.Contains(message, "containerLogs") {
|
|
return "", fmt.Errorf("pod logs %s failed because Kubernetes could not reach the node kubelet log endpoint: %s", podName, message)
|
|
}
|
|
return "", fmt.Errorf("pod logs %s failed: %s: %s", podName, resp.Status, message)
|
|
}
|
|
body, err := io.ReadAll(io.LimitReader(resp.Body, 1<<20))
|
|
if err != nil {
|
|
return "", err
|
|
}
|
|
return string(body), nil
|
|
}
|