2026-03-31 20:42:35 -03:00
package service
import (
"bytes"
"crypto/tls"
"crypto/x509"
"encoding/json"
"fmt"
"io"
"net/http"
"net/url"
"os"
"sort"
"strings"
"time"
)
type clusterNode struct {
Name string
Arch string
Hardware string
Worker bool
ControlPlane bool
Unschedulable bool
}
type podState struct {
Name string
Phase string
Reason string
Message string
}
type kubeClient struct {
baseURL string
token string
client * http . Client
}
func inClusterKubeClient ( ) ( * kubeClient , error ) {
host := strings . TrimSpace ( os . Getenv ( "KUBERNETES_SERVICE_HOST" ) )
port := strings . TrimSpace ( os . Getenv ( "KUBERNETES_SERVICE_PORT" ) )
if host == "" || port == "" {
return nil , fmt . Errorf ( "not running in cluster" )
}
token , err := os . ReadFile ( "/var/run/secrets/kubernetes.io/serviceaccount/token" )
if err != nil {
return nil , err
}
caPEM , err := os . ReadFile ( "/var/run/secrets/kubernetes.io/serviceaccount/ca.crt" )
if err != nil {
return nil , err
}
pool := x509 . NewCertPool ( )
if ! pool . AppendCertsFromPEM ( caPEM ) {
return nil , fmt . Errorf ( "append kubernetes CA" )
}
return & kubeClient {
baseURL : fmt . Sprintf ( "https://%s:%s" , host , port ) ,
token : strings . TrimSpace ( string ( token ) ) ,
client : & http . Client {
Timeout : 30 * time . Second ,
Transport : & http . Transport {
TLSClientConfig : & tls . Config { RootCAs : pool } ,
} ,
} ,
} , nil
}
func ( k * kubeClient ) jsonRequest ( method , path string , body any , out any ) error {
var reader io . Reader
if body != nil {
data , err := json . Marshal ( body )
if err != nil {
return err
}
reader = bytes . NewReader ( data )
}
req , err := http . NewRequest ( method , k . baseURL + path , reader )
if err != nil {
return err
}
req . Header . Set ( "Authorization" , "Bearer " + k . token )
if body != nil {
req . Header . Set ( "Content-Type" , "application/json" )
}
resp , err := k . client . Do ( req )
if err != nil {
return err
}
defer resp . Body . Close ( )
if resp . StatusCode >= 300 {
payload , _ := io . ReadAll ( io . LimitReader ( resp . Body , 8192 ) )
return fmt . Errorf ( "%s %s failed: %s: %s" , method , path , resp . Status , strings . TrimSpace ( string ( payload ) ) )
}
if out == nil {
return nil
}
return json . NewDecoder ( io . LimitReader ( resp . Body , 1 << 20 ) ) . Decode ( out )
}
func ( k * kubeClient ) deleteRequest ( path string ) error {
req , err := http . NewRequest ( http . MethodDelete , k . baseURL + path , nil )
if err != nil {
return err
}
req . Header . Set ( "Authorization" , "Bearer " + k . token )
resp , err := k . client . Do ( req )
if err != nil {
return err
}
defer resp . Body . Close ( )
if resp . StatusCode == http . StatusNotFound || resp . StatusCode == http . StatusOK || resp . StatusCode == http . StatusAccepted {
return nil
}
payload , _ := io . ReadAll ( io . LimitReader ( resp . Body , 4096 ) )
return fmt . Errorf ( "delete %s failed: %s: %s" , path , resp . Status , strings . TrimSpace ( string ( payload ) ) )
}
func clusterNodes ( ) [ ] clusterNode {
kube , err := inClusterKubeClient ( )
if err != nil {
return nil
}
var payload struct {
Items [ ] struct {
Metadata struct {
Name string ` json:"name" `
Labels map [ string ] string ` json:"labels" `
} ` json:"metadata" `
Spec struct {
Unschedulable bool ` json:"unschedulable" `
} ` json:"spec" `
} ` json:"items" `
}
if err := kube . jsonRequest ( http . MethodGet , "/api/v1/nodes" , nil , & payload ) ; err != nil {
return nil
}
nodes := make ( [ ] clusterNode , 0 , len ( payload . Items ) )
for _ , item := range payload . Items {
labels := item . Metadata . Labels
nodes = append ( nodes , clusterNode {
Name : strings . TrimSpace ( item . Metadata . Name ) ,
Arch : strings . TrimSpace ( labels [ "kubernetes.io/arch" ] ) ,
Hardware : strings . TrimSpace ( labels [ "hardware" ] ) ,
Worker : labels [ "node-role.kubernetes.io/worker" ] == "true" ,
ControlPlane : labels [ "node-role.kubernetes.io/control-plane" ] != "" || labels [ "node-role.kubernetes.io/master" ] != "" ,
Unschedulable : item . Spec . Unschedulable ,
} )
}
sort . Slice ( nodes , func ( i , j int ) bool { return nodes [ i ] . Name < nodes [ j ] . Name } )
return nodes
}
func ( a * App ) podImageForArch ( arch string ) string {
switch strings . TrimSpace ( arch ) {
case "arm64" :
return strings . TrimSpace ( a . settings . RunnerImageARM64 )
case "amd64" :
return strings . TrimSpace ( a . settings . RunnerImageAMD64 )
default :
return ""
}
}
func ( a * App ) runRemotePod ( jobID , podName string , podSpec map [ string ] any ) ( string , error ) {
kube , err := inClusterKubeClient ( )
if err != nil {
return "" , err
}
ns := url . PathEscape ( a . settings . Namespace )
_ = kube . deleteRequest ( fmt . Sprintf ( "/api/v1/namespaces/%s/pods/%s" , ns , url . PathEscape ( podName ) ) )
defer func ( ) {
_ = kube . deleteRequest ( fmt . Sprintf ( "/api/v1/namespaces/%s/pods/%s" , ns , url . PathEscape ( podName ) ) )
} ( )
if err := kube . jsonRequest ( http . MethodPost , fmt . Sprintf ( "/api/v1/namespaces/%s/pods" , ns ) , podSpec , nil ) ; err != nil {
return "" , err
}
deadline := time . Now ( ) . Add ( 12 * time . Minute )
2026-04-01 01:45:44 -03:00
lastState := podState { Name : podName }
2026-03-31 20:42:35 -03:00
for time . Now ( ) . Before ( deadline ) {
state , err := a . remotePodState ( kube , podName )
if err != nil {
return "" , err
}
2026-04-01 01:45:44 -03:00
lastState = state
if strings . TrimSpace ( jobID ) != "" {
2026-04-01 02:07:09 -03:00
a . heartbeatRemoteJob ( jobID )
2026-04-01 01:45:44 -03:00
}
2026-03-31 20:42:35 -03:00
switch state . Phase {
case "Succeeded" :
2026-04-01 01:45:44 -03:00
if strings . TrimSpace ( state . Message ) != "" {
return strings . TrimSpace ( state . Message ) , nil
}
2026-04-01 12:23:31 -03:00
logs , logErr := a . remotePodLogs ( kube , podName )
if strings . TrimSpace ( logs ) != "" {
return strings . TrimSpace ( logs ) , nil
}
if logErr != nil {
return "" , fmt . Errorf ( "remote pod %s succeeded but did not return a result payload; logs unavailable: %v" , podName , logErr )
}
return "" , fmt . Errorf ( "remote pod %s succeeded but did not return a result payload" , podName )
2026-03-31 20:42:35 -03:00
case "Failed" :
2026-04-01 01:45:44 -03:00
if strings . TrimSpace ( state . Message ) != "" {
return "" , fmt . Errorf ( "remote pod %s failed: %s" , podName , strings . TrimSpace ( state . Message ) )
}
2026-04-01 12:23:31 -03:00
if logs , logErr := a . remotePodLogs ( kube , podName ) ; logErr == nil && strings . TrimSpace ( logs ) != "" {
2026-03-31 20:42:35 -03:00
return "" , fmt . Errorf ( "remote pod %s failed: %s" , podName , strings . TrimSpace ( logs ) )
}
2026-04-01 12:23:31 -03:00
reason := strings . TrimSpace ( state . Reason )
if reason == "" {
reason = "remote worker failed before reporting details"
}
return "" , fmt . Errorf ( "remote pod %s failed: %s" , podName , reason )
2026-03-31 20:42:35 -03:00
}
time . Sleep ( 2 * time . Second )
}
2026-04-01 01:45:44 -03:00
if lastState . Phase != "" {
return "" , fmt . Errorf ( "remote pod %s timed out in phase %s: %s %s" , podName , lastState . Phase , strings . TrimSpace ( lastState . Reason ) , strings . TrimSpace ( lastState . Message ) )
}
2026-03-31 20:42:35 -03:00
return "" , fmt . Errorf ( "remote pod %s timed out" , podName )
}
func ( a * App ) remotePodState ( kube * kubeClient , podName string ) ( podState , error ) {
var payload struct {
Metadata struct {
Name string ` json:"name" `
} ` json:"metadata" `
Status struct {
Phase string ` json:"phase" `
Reason string ` json:"reason" `
Message string ` json:"message" `
Conditions [ ] struct {
Type string ` json:"type" `
Status string ` json:"status" `
Reason string ` json:"reason" `
Message string ` json:"message" `
} ` json:"conditions" `
ContainerStatuses [ ] struct {
State struct {
Waiting struct {
Reason string ` json:"reason" `
Message string ` json:"message" `
} ` json:"waiting" `
Terminated struct {
Reason string ` json:"reason" `
Message string ` json:"message" `
} ` json:"terminated" `
} ` json:"state" `
} ` json:"containerStatuses" `
} ` json:"status" `
}
ns := url . PathEscape ( a . settings . Namespace )
if err := kube . jsonRequest ( http . MethodGet , fmt . Sprintf ( "/api/v1/namespaces/%s/pods/%s" , ns , url . PathEscape ( podName ) ) , nil , & payload ) ; err != nil {
return podState { } , err
}
out := podState {
Name : payload . Metadata . Name ,
Phase : payload . Status . Phase ,
Reason : payload . Status . Reason ,
Message : payload . Status . Message ,
}
if len ( payload . Status . ContainerStatuses ) > 0 {
waiting := payload . Status . ContainerStatuses [ 0 ] . State . Waiting
terminated := payload . Status . ContainerStatuses [ 0 ] . State . Terminated
if strings . TrimSpace ( waiting . Reason ) != "" {
out . Reason = waiting . Reason
out . Message = waiting . Message
}
if strings . TrimSpace ( terminated . Reason ) != "" {
out . Reason = terminated . Reason
if strings . TrimSpace ( terminated . Message ) != "" {
out . Message = terminated . Message
}
}
}
return out , nil
}
func ( a * App ) remotePodLogs ( kube * kubeClient , podName string ) ( string , error ) {
ns := url . PathEscape ( a . settings . Namespace )
req , err := http . NewRequest ( http . MethodGet , fmt . Sprintf ( "%s/api/v1/namespaces/%s/pods/%s/log" , kube . baseURL , ns , url . PathEscape ( podName ) ) , nil )
if err != nil {
return "" , err
}
req . Header . Set ( "Authorization" , "Bearer " + kube . token )
resp , err := kube . client . Do ( req )
if err != nil {
return "" , err
}
defer resp . Body . Close ( )
if resp . StatusCode >= 300 {
body , _ := io . ReadAll ( io . LimitReader ( resp . Body , 4096 ) )
2026-04-01 01:45:44 -03:00
message := strings . TrimSpace ( string ( body ) )
if strings . Contains ( message , "proxy error from 127.0.0.1:6443" ) || strings . Contains ( message , "containerLogs" ) {
return "" , fmt . Errorf ( "pod logs %s failed because Kubernetes could not reach the node kubelet log endpoint: %s" , podName , message )
}
return "" , fmt . Errorf ( "pod logs %s failed: %s: %s" , podName , resp . Status , message )
2026-03-31 20:42:35 -03:00
}
body , err := io . ReadAll ( io . LimitReader ( resp . Body , 1 << 20 ) )
if err != nil {
return "" , err
}
return string ( body ) , nil
}