service: avoid kubelet log dependency for remote workers

This commit is contained in:
Brad Stein 2026-04-01 01:45:44 -03:00
parent bd61275821
commit 801374d184
4 changed files with 75 additions and 11 deletions

View File

@ -40,10 +40,7 @@ func remoteDevicesCmd(args []string) {
} }
return devices[i].Path < devices[j].Path return devices[i].Path < devices[j].Path
}) })
writeStructuredResult(map[string]any{"devices": devices})
enc := json.NewEncoder(os.Stdout)
enc.SetIndent("", " ")
_ = enc.Encode(map[string]any{"devices": devices})
} }
func remoteBuildCmd(args []string) { func remoteBuildCmd(args []string) {
@ -116,9 +113,7 @@ func remoteBuildCmd(args []string) {
UpdatedAt: builtAt, UpdatedAt: builtAt,
SizeBytes: info.Size(), SizeBytes: info.Size(),
} }
enc := json.NewEncoder(os.Stdout) writeStructuredResult(summary)
enc.SetIndent("", " ")
_ = enc.Encode(summary)
} }
func remoteFlashCmd(args []string) { func remoteFlashCmd(args []string) {
@ -177,9 +172,7 @@ func remoteFlashCmd(args []string) {
if err != nil { if err != nil {
log.Fatalf("stat destination: %v", err) log.Fatalf("stat destination: %v", err)
} }
enc := json.NewEncoder(os.Stdout) writeStructuredResult(map[string]any{
enc.SetIndent("", " ")
_ = enc.Encode(map[string]any{
"node": *node, "node": *node,
"device": *device, "device": *device,
"dest_path": destPath, "dest_path": destPath,
@ -187,6 +180,19 @@ func remoteFlashCmd(args []string) {
}) })
} }
func writeStructuredResult(payload any) {
data, err := json.Marshal(payload)
if err != nil {
log.Fatalf("encode result: %v", err)
}
if _, err := os.Stdout.Write(append(data, '\n')); err != nil {
log.Fatalf("write stdout result: %v", err)
}
// Keep the result available in pod status so Metis does not depend on the
// kubelet log endpoint for successful worker runs.
_ = os.WriteFile("/dev/termination-log", data, 0o644)
}
func localFlashDevices(maxBytes int64, hostTmpDir string) ([]service.Device, error) { func localFlashDevices(maxBytes int64, hostTmpDir string) ([]service.Device, error) {
cmd := exec.Command("lsblk", "-J", "-b", "-o", "NAME,PATH,RM,HOTPLUG,SIZE,MODEL,TRAN,TYPE") cmd := exec.Command("lsblk", "-J", "-b", "-o", "NAME,PATH,RM,HOTPLUG,SIZE,MODEL,TRAN,TYPE")
out, err := cmd.Output() out, err := cmd.Output()

View File

@ -57,6 +57,7 @@ type Job struct {
Total int64 `json:"total_bytes,omitempty"` Total int64 `json:"total_bytes,omitempty"`
Error string `json:"error,omitempty"` Error string `json:"error,omitempty"`
StartedAt time.Time `json:"started_at"` StartedAt time.Time `json:"started_at"`
UpdatedAt time.Time `json:"updated_at,omitempty"`
FinishedAt time.Time `json:"finished_at,omitempty"` FinishedAt time.Time `json:"finished_at,omitempty"`
} }
@ -314,6 +315,7 @@ func (a *App) newJob(kind, node, host, device string) *Job {
Status: JobQueued, Status: JobQueued,
ProgressPct: 0, ProgressPct: 0,
StartedAt: time.Now().UTC(), StartedAt: time.Now().UTC(),
UpdatedAt: time.Now().UTC(),
} }
a.mu.Lock() a.mu.Lock()
a.jobs[job.ID] = job a.jobs[job.ID] = job
@ -335,6 +337,7 @@ func (a *App) setJob(id string, update func(*Job)) {
return return
} }
update(job) update(job)
job.UpdatedAt = time.Now().UTC()
} }
func (a *App) failJob(id string, err error) { func (a *App) failJob(id string, err error) {
@ -356,6 +359,7 @@ func (a *App) completeJob(id string, update func(*Job)) {
if job.Status != JobError { if job.Status != JobError {
job.Status = JobDone job.Status = JobDone
} }
job.UpdatedAt = time.Now().UTC()
job.FinishedAt = time.Now().UTC() job.FinishedAt = time.Now().UTC()
} }

View File

@ -178,15 +178,26 @@ func (a *App) runRemotePod(jobID, podName string, podSpec map[string]any) (strin
} }
deadline := time.Now().Add(12 * time.Minute) deadline := time.Now().Add(12 * time.Minute)
lastState := podState{Name: podName}
for time.Now().Before(deadline) { for time.Now().Before(deadline) {
state, err := a.remotePodState(kube, podName) state, err := a.remotePodState(kube, podName)
if err != nil { if err != nil {
return "", err return "", err
} }
lastState = state
if strings.TrimSpace(jobID) != "" {
a.setJob(jobID, func(_ *Job) {})
}
switch state.Phase { switch state.Phase {
case "Succeeded": case "Succeeded":
if strings.TrimSpace(state.Message) != "" {
return strings.TrimSpace(state.Message), nil
}
return a.remotePodLogs(kube, podName) return a.remotePodLogs(kube, podName)
case "Failed": case "Failed":
if strings.TrimSpace(state.Message) != "" {
return "", fmt.Errorf("remote pod %s failed: %s", podName, strings.TrimSpace(state.Message))
}
logs, _ := a.remotePodLogs(kube, podName) logs, _ := a.remotePodLogs(kube, podName)
if strings.TrimSpace(logs) != "" { if strings.TrimSpace(logs) != "" {
return "", fmt.Errorf("remote pod %s failed: %s", podName, strings.TrimSpace(logs)) return "", fmt.Errorf("remote pod %s failed: %s", podName, strings.TrimSpace(logs))
@ -195,6 +206,9 @@ func (a *App) runRemotePod(jobID, podName string, podSpec map[string]any) (strin
} }
time.Sleep(2 * time.Second) time.Sleep(2 * time.Second)
} }
if lastState.Phase != "" {
return "", fmt.Errorf("remote pod %s timed out in phase %s: %s %s", podName, lastState.Phase, strings.TrimSpace(lastState.Reason), strings.TrimSpace(lastState.Message))
}
return "", fmt.Errorf("remote pod %s timed out", podName) return "", fmt.Errorf("remote pod %s timed out", podName)
} }
@ -268,7 +282,11 @@ func (a *App) remotePodLogs(kube *kubeClient, podName string) (string, error) {
defer resp.Body.Close() defer resp.Body.Close()
if resp.StatusCode >= 300 { if resp.StatusCode >= 300 {
body, _ := io.ReadAll(io.LimitReader(resp.Body, 4096)) body, _ := io.ReadAll(io.LimitReader(resp.Body, 4096))
return "", fmt.Errorf("pod logs %s failed: %s: %s", podName, resp.Status, strings.TrimSpace(string(body))) message := strings.TrimSpace(string(body))
if strings.Contains(message, "proxy error from 127.0.0.1:6443") || strings.Contains(message, "containerLogs") {
return "", fmt.Errorf("pod logs %s failed because Kubernetes could not reach the node kubelet log endpoint: %s", podName, message)
}
return "", fmt.Errorf("pod logs %s failed: %s: %s", podName, resp.Status, message)
} }
body, err := io.ReadAll(io.LimitReader(resp.Body, 1<<20)) body, err := io.ReadAll(io.LimitReader(resp.Body, 1<<20))
if err != nil { if err != nil {

View File

@ -580,6 +580,7 @@ var metisPage = template.Must(template.New("metis").Parse(`<!doctype html>
const boot = JSON.parse(document.getElementById('boot').textContent); const boot = JSON.parse(document.getElementById('boot').textContent);
let state = boot; let state = boot;
let busy = false; let busy = false;
let lastJobAlert = '';
const nodeSelect = document.getElementById('node-select'); const nodeSelect = document.getElementById('node-select');
const hostSelect = document.getElementById('host-select'); const hostSelect = document.getElementById('host-select');
@ -615,6 +616,22 @@ var metisPage = template.Must(template.New("metis").Parse(`<!doctype html>
return size.toFixed(size >= 10 || idx === 0 ? 0 : 1) + ' ' + units[idx]; return size.toFixed(size >= 10 || idx === 0 ? 0 : 1) + ' ' + units[idx];
} }
function fmtDuration(startValue, endValue){
if(!startValue){ return ''; }
const start = new Date(startValue);
if(isNaN(start.getTime())){ return ''; }
const end = endValue ? new Date(endValue) : new Date();
if(isNaN(end.getTime())){ return ''; }
let seconds = Math.max(0, Math.round((end.getTime() - start.getTime()) / 1000));
const hours = Math.floor(seconds / 3600);
seconds -= hours * 3600;
const minutes = Math.floor(seconds / 60);
seconds -= minutes * 60;
if(hours){ return hours + 'h ' + minutes + 'm'; }
if(minutes){ return minutes + 'm ' + seconds + 's'; }
return seconds + 's';
}
function banner(kind, title, text){ function banner(kind, title, text){
bannerEl.className = 'banner ' + kind; bannerEl.className = 'banner ' + kind;
bannerTitleEl.textContent = title; bannerTitleEl.textContent = title;
@ -667,6 +684,15 @@ var metisPage = template.Must(template.New("metis").Parse(`<!doctype html>
const statusClass = job.status === 'error' ? 'error' : (job.status === 'done' ? 'done' : (job.status === 'running' ? 'running' : '')); const statusClass = job.status === 'error' ? 'error' : (job.status === 'done' ? 'done' : (job.status === 'running' ? 'running' : ''));
const title = job.kind.toUpperCase() + (job.node ? ' · ' + job.node : ''); const title = job.kind.toUpperCase() + (job.node ? ' · ' + job.node : '');
const started = fmtTime(job.started_at) + (job.device ? ' · ' + job.device : '') + (job.host ? ' · ' + job.host : ''); const started = fmtTime(job.started_at) + (job.device ? ' · ' + job.device : '') + (job.host ? ' · ' + job.host : '');
const timingBits = [];
if(job.stage){ timingBits.push('stage: ' + job.stage); }
const duration = fmtDuration(job.started_at, job.finished_at);
if(duration){
timingBits.push((job.status === 'running' ? 'elapsed ' : 'duration ') + duration);
}
if(job.updated_at && job.status === 'running'){
timingBits.push('last update ' + fmtDuration(job.updated_at, new Date().toISOString()) + ' ago');
}
const detailBits = []; const detailBits = [];
if(job.written_bytes){ detailBits.push(fmtBytes(job.written_bytes) + ' / ' + fmtBytes(job.total_bytes)); } if(job.written_bytes){ detailBits.push(fmtBytes(job.written_bytes) + ' / ' + fmtBytes(job.total_bytes)); }
if(job.artifact){ detailBits.push(job.artifact); } if(job.artifact){ detailBits.push(job.artifact); }
@ -678,10 +704,20 @@ var metisPage = template.Must(template.New("metis").Parse(`<!doctype html>
'</div>' + '</div>' +
'<div>' + (job.message || job.stage || 'queued') + '</div>' + '<div>' + (job.message || job.stage || 'queued') + '</div>' +
'<div class="meta">' + started + '</div>' + '<div class="meta">' + started + '</div>' +
'<div class="meta">' + timingBits.join(' · ') + '</div>' +
'<div class="meta">' + detailBits.join(' · ') + '</div>' + '<div class="meta">' + detailBits.join(' · ') + '</div>' +
'<div class="bar"><span style="width:' + Math.max(0, Math.min(100, job.progress_pct || 0)) + '%"></span></div>'; '<div class="bar"><span style="width:' + Math.max(0, Math.min(100, job.progress_pct || 0)) + '%"></span></div>';
jobsEl.appendChild(wrap); jobsEl.appendChild(wrap);
}); });
const newestError = jobs.find((job)=>job.status === 'error');
if(newestError){
const signature = [newestError.id, newestError.error || newestError.message || newestError.stage || 'error'].join(':');
if(signature !== lastJobAlert){
lastJobAlert = signature;
banner('error', 'Metis job failed', newestError.error || newestError.message || 'Check the live jobs panel for details.');
}
}
} }
function renderEvents(){ function renderEvents(){