package service import ( "bytes" "crypto/tls" "crypto/x509" "encoding/json" "fmt" "io" "net/http" "net/url" "os" "sort" "strings" "time" ) type clusterNode struct { Name string Arch string Hardware string Worker bool ControlPlane bool Unschedulable bool } type podState struct { Name string Phase string Reason string Message string } type kubeClient struct { baseURL string token string client *http.Client } func inClusterKubeClient() (*kubeClient, error) { host := strings.TrimSpace(os.Getenv("KUBERNETES_SERVICE_HOST")) port := strings.TrimSpace(os.Getenv("KUBERNETES_SERVICE_PORT")) if host == "" || port == "" { return nil, fmt.Errorf("not running in cluster") } token, err := os.ReadFile("/var/run/secrets/kubernetes.io/serviceaccount/token") if err != nil { return nil, err } caPEM, err := os.ReadFile("/var/run/secrets/kubernetes.io/serviceaccount/ca.crt") if err != nil { return nil, err } pool := x509.NewCertPool() if !pool.AppendCertsFromPEM(caPEM) { return nil, fmt.Errorf("append kubernetes CA") } return &kubeClient{ baseURL: fmt.Sprintf("https://%s:%s", host, port), token: strings.TrimSpace(string(token)), client: &http.Client{ Timeout: 30 * time.Second, Transport: &http.Transport{ TLSClientConfig: &tls.Config{RootCAs: pool}, }, }, }, nil } func (k *kubeClient) jsonRequest(method, path string, body any, out any) error { var reader io.Reader if body != nil { data, err := json.Marshal(body) if err != nil { return err } reader = bytes.NewReader(data) } req, err := http.NewRequest(method, k.baseURL+path, reader) if err != nil { return err } req.Header.Set("Authorization", "Bearer "+k.token) if body != nil { req.Header.Set("Content-Type", "application/json") } resp, err := k.client.Do(req) if err != nil { return err } defer resp.Body.Close() if resp.StatusCode >= 300 { payload, _ := io.ReadAll(io.LimitReader(resp.Body, 8192)) return fmt.Errorf("%s %s failed: %s: %s", method, path, resp.Status, strings.TrimSpace(string(payload))) } if out == nil { return nil } return json.NewDecoder(io.LimitReader(resp.Body, 1<<20)).Decode(out) } func (k *kubeClient) deleteRequest(path string) error { req, err := http.NewRequest(http.MethodDelete, k.baseURL+path, nil) if err != nil { return err } req.Header.Set("Authorization", "Bearer "+k.token) resp, err := k.client.Do(req) if err != nil { return err } defer resp.Body.Close() if resp.StatusCode == http.StatusNotFound || resp.StatusCode == http.StatusOK || resp.StatusCode == http.StatusAccepted { return nil } payload, _ := io.ReadAll(io.LimitReader(resp.Body, 4096)) return fmt.Errorf("delete %s failed: %s: %s", path, resp.Status, strings.TrimSpace(string(payload))) } func clusterNodes() []clusterNode { kube, err := inClusterKubeClient() if err != nil { return nil } var payload struct { Items []struct { Metadata struct { Name string `json:"name"` Labels map[string]string `json:"labels"` } `json:"metadata"` Spec struct { Unschedulable bool `json:"unschedulable"` } `json:"spec"` } `json:"items"` } if err := kube.jsonRequest(http.MethodGet, "/api/v1/nodes", nil, &payload); err != nil { return nil } nodes := make([]clusterNode, 0, len(payload.Items)) for _, item := range payload.Items { labels := item.Metadata.Labels nodes = append(nodes, clusterNode{ Name: strings.TrimSpace(item.Metadata.Name), Arch: strings.TrimSpace(labels["kubernetes.io/arch"]), Hardware: strings.TrimSpace(labels["hardware"]), Worker: labels["node-role.kubernetes.io/worker"] == "true", ControlPlane: labels["node-role.kubernetes.io/control-plane"] != "" || labels["node-role.kubernetes.io/master"] != "", Unschedulable: item.Spec.Unschedulable, }) } sort.Slice(nodes, func(i, j int) bool { return nodes[i].Name < nodes[j].Name }) return nodes } func (a *App) podImageForArch(arch string) string { switch strings.TrimSpace(arch) { case "arm64": return strings.TrimSpace(a.settings.RunnerImageARM64) case "amd64": return strings.TrimSpace(a.settings.RunnerImageAMD64) default: return "" } } func (a *App) runRemotePod(jobID, podName string, podSpec map[string]any) (string, error) { kube, err := inClusterKubeClient() if err != nil { return "", err } ns := url.PathEscape(a.settings.Namespace) _ = kube.deleteRequest(fmt.Sprintf("/api/v1/namespaces/%s/pods/%s", ns, url.PathEscape(podName))) defer func() { _ = kube.deleteRequest(fmt.Sprintf("/api/v1/namespaces/%s/pods/%s", ns, url.PathEscape(podName))) }() if err := kube.jsonRequest(http.MethodPost, fmt.Sprintf("/api/v1/namespaces/%s/pods", ns), podSpec, nil); err != nil { return "", err } deadline := time.Now().Add(12 * time.Minute) lastState := podState{Name: podName} for time.Now().Before(deadline) { state, err := a.remotePodState(kube, podName) if err != nil { return "", err } lastState = state if strings.TrimSpace(jobID) != "" { a.heartbeatRemoteJob(jobID) } switch state.Phase { case "Succeeded": if strings.TrimSpace(state.Message) != "" { return strings.TrimSpace(state.Message), nil } logs, logErr := a.remotePodLogs(kube, podName) if strings.TrimSpace(logs) != "" { return strings.TrimSpace(logs), nil } if logErr != nil { return "", fmt.Errorf("remote pod %s succeeded but did not return a result payload; logs unavailable: %v", podName, logErr) } return "", fmt.Errorf("remote pod %s succeeded but did not return a result payload", podName) case "Failed": if strings.TrimSpace(state.Message) != "" { return "", fmt.Errorf("remote pod %s failed: %s", podName, strings.TrimSpace(state.Message)) } if logs, logErr := a.remotePodLogs(kube, podName); logErr == nil && strings.TrimSpace(logs) != "" { return "", fmt.Errorf("remote pod %s failed: %s", podName, strings.TrimSpace(logs)) } reason := strings.TrimSpace(state.Reason) if reason == "" { reason = "remote worker failed before reporting details" } return "", fmt.Errorf("remote pod %s failed: %s", podName, reason) } time.Sleep(2 * time.Second) } if lastState.Phase != "" { return "", fmt.Errorf("remote pod %s timed out in phase %s: %s %s", podName, lastState.Phase, strings.TrimSpace(lastState.Reason), strings.TrimSpace(lastState.Message)) } return "", fmt.Errorf("remote pod %s timed out", podName) } func (a *App) remotePodState(kube *kubeClient, podName string) (podState, error) { var payload struct { Metadata struct { Name string `json:"name"` } `json:"metadata"` Status struct { Phase string `json:"phase"` Reason string `json:"reason"` Message string `json:"message"` Conditions []struct { Type string `json:"type"` Status string `json:"status"` Reason string `json:"reason"` Message string `json:"message"` } `json:"conditions"` ContainerStatuses []struct { State struct { Waiting struct { Reason string `json:"reason"` Message string `json:"message"` } `json:"waiting"` Terminated struct { Reason string `json:"reason"` Message string `json:"message"` } `json:"terminated"` } `json:"state"` } `json:"containerStatuses"` } `json:"status"` } ns := url.PathEscape(a.settings.Namespace) if err := kube.jsonRequest(http.MethodGet, fmt.Sprintf("/api/v1/namespaces/%s/pods/%s", ns, url.PathEscape(podName)), nil, &payload); err != nil { return podState{}, err } out := podState{ Name: payload.Metadata.Name, Phase: payload.Status.Phase, Reason: payload.Status.Reason, Message: payload.Status.Message, } if len(payload.Status.ContainerStatuses) > 0 { waiting := payload.Status.ContainerStatuses[0].State.Waiting terminated := payload.Status.ContainerStatuses[0].State.Terminated if strings.TrimSpace(waiting.Reason) != "" { out.Reason = waiting.Reason out.Message = waiting.Message } if strings.TrimSpace(terminated.Reason) != "" { out.Reason = terminated.Reason } if strings.TrimSpace(terminated.Message) != "" { out.Message = terminated.Message } } return out, nil } func (a *App) remotePodLogs(kube *kubeClient, podName string) (string, error) { ns := url.PathEscape(a.settings.Namespace) req, err := http.NewRequest(http.MethodGet, fmt.Sprintf("%s/api/v1/namespaces/%s/pods/%s/log", kube.baseURL, ns, url.PathEscape(podName)), nil) if err != nil { return "", err } req.Header.Set("Authorization", "Bearer "+kube.token) resp, err := kube.client.Do(req) if err != nil { return "", err } defer resp.Body.Close() if resp.StatusCode >= 300 { body, _ := io.ReadAll(io.LimitReader(resp.Body, 4096)) message := strings.TrimSpace(string(body)) if strings.Contains(message, "proxy error from 127.0.0.1:6443") || strings.Contains(message, "containerLogs") { return "", fmt.Errorf("pod logs %s failed because Kubernetes could not reach the node kubelet log endpoint: %s", podName, message) } return "", fmt.Errorf("pod logs %s failed: %s: %s", podName, resp.Status, message) } body, err := io.ReadAll(io.LimitReader(resp.Body, 1<<20)) if err != nil { return "", err } return string(body), nil }