metis: extend remote build watchdog and stream progress

This commit is contained in:
codex 2026-04-19 22:29:19 -03:00
parent db000e8e48
commit 7dc89e3bb5
8 changed files with 244 additions and 4 deletions

View File

@ -11,8 +11,10 @@ import (
"sort"
"strconv"
"strings"
"sync"
"time"
"metis/pkg/image"
"metis/pkg/plan"
"metis/pkg/service"
"metis/pkg/writer"
@ -63,9 +65,45 @@ func remoteBuildCmd(args []string) {
}
output := filepath.Join(*workDir, fmt.Sprintf("%s.img", *node))
inv := loadInventory(*invPath)
if err := plan.BuildImageFile(context.Background(), inv, *node, *cacheDir, output); err != nil {
fatalf("build image: %v", err)
emitStageProgress("build", 12, fmt.Sprintf("Resolving the replacement build plan for %s", *node))
p, err := plan.Build(inv, *node, output, *cacheDir)
if err != nil {
fatalf("build plan: %v", err)
}
_, class, err := inv.FindNode(*node)
if err != nil {
fatalf("load node class: %v", err)
}
cacheImage := filepath.Join(*cacheDir, strings.TrimSuffix(filepath.Base(p.Image), ".xz"))
emitStageProgress("build", 16, fmt.Sprintf("Downloading and verifying the base image for %s", *node))
cacheImage, err = image.DownloadAndVerify(p.Image, cacheImage, class.Checksum)
if err != nil {
fatalf("download image: %v", err)
}
copyEmitter := newProgressEmitter("build", 20, 34, fmt.Sprintf("Copying the verified base image for %s", *node), false)
if err := writer.WriteImageWithProgress(context.Background(), cacheImage, output, copyEmitter); err != nil {
fatalf("copy base image: %v", err)
}
emitStageProgress("build", 36, fmt.Sprintf("Preparing node-specific injected files for %s", *node))
files, err := plan.Files(inv, *node)
if err != nil {
fatalf("resolve files: %v", err)
}
rootfsProgress := map[string]service.RemoteProgressUpdate{
image.RootFSProgressFindingPartition: {Stage: "build", ProgressPct: 40, Message: fmt.Sprintf("Finding the Linux root partition for %s", *node)},
image.RootFSProgressExtracting: {Stage: "build", ProgressPct: 44, Message: fmt.Sprintf("Extracting the Linux root partition for %s", *node)},
image.RootFSProgressWritingFiles: {Stage: "build", ProgressPct: 50, Message: fmt.Sprintf("Injecting node-specific files into the root filesystem for %s", *node)},
image.RootFSProgressReplacing: {Stage: "build", ProgressPct: 56, Message: fmt.Sprintf("Replacing the root partition inside the replacement image for %s", *node)},
}
if err := image.InjectRootFSWithProgress(output, files, func(step string) {
if update, ok := rootfsProgress[step]; ok {
emitProgress(update)
}
}); err != nil {
fatalf("inject rootfs: %v", err)
}
emitStageProgress("build", 58, fmt.Sprintf("Built the replacement image filesystem for %s", *node))
emitStageProgress("build", 60, fmt.Sprintf("Compressing the replacement image for %s before upload", *node))
if err := exec.Command("xz", "-T0", "-z", "-f", output).Run(); err != nil {
fatalf("xz compress: %v", err)
}
@ -92,13 +130,17 @@ func remoteBuildCmd(args []string) {
if err := os.WriteFile(metadataPath, metaBytes, 0o644); err != nil {
fatalf("write metadata: %v", err)
}
emitStageProgress("build", 68, fmt.Sprintf("Compression complete for %s; preparing the Harbor upload", *node))
emitStageProgress("build", 70, fmt.Sprintf("Authenticating to Harbor for %s", *node))
if err := orasLogin(*harborRegistry, *harborUsername, *harborPassword); err != nil {
fatalf("oras login: %v", err)
}
taggedRef := fmt.Sprintf("%s:%s", *artifactRef, *buildTag)
emitStageProgress("build", 72, fmt.Sprintf("Uploading %s to Harbor", filepath.Base(compressedPath)))
if err := orasPush(taggedRef, compressedPath, metadataPath); err != nil {
fatalf("oras push: %v", err)
}
emitStageProgress("build", 76, fmt.Sprintf("Refreshing the latest Harbor tag for %s", *node))
if err := orasTag(taggedRef, "latest"); err != nil {
fatalf("oras tag latest: %v", err)
}
@ -133,18 +175,21 @@ func remoteFlashCmd(args []string) {
if err := os.MkdirAll(*workDir, 0o755); err != nil {
fatalf("mkdir workdir: %v", err)
}
emitStageProgress("flash", 84, fmt.Sprintf("Pulling the latest Harbor artifact for %s", *node))
if err := orasLogin(*harborRegistry, *harborUsername, *harborPassword); err != nil {
fatalf("oras login: %v", err)
}
if err := orasPull(fmt.Sprintf("%s:latest", *artifactRef), *workDir); err != nil {
fatalf("oras pull: %v", err)
}
emitStageProgress("flash", 88, fmt.Sprintf("Preparing the downloaded image for %s", *node))
imagePath, compressed, err := resolvePulledArtifact(*workDir)
if err != nil {
fatalf("resolve artifact: %v", err)
}
rawImage := imagePath
if compressed {
emitStageProgress("flash", 90, fmt.Sprintf("Decompressing the image for %s before writing", *node))
rawImage = filepath.Join(*workDir, fmt.Sprintf("%s.img", *node))
cmd := exec.Command("sh", "-lc", fmt.Sprintf("xz -dc '%s' > '%s'", imagePath, rawImage))
if out, err := cmd.CombinedOutput(); err != nil {
@ -159,9 +204,12 @@ func remoteFlashCmd(args []string) {
}
destPath = filepath.Join(*hostTmpDir, fmt.Sprintf("%s.img", *node))
}
if err := writer.WriteImage(context.Background(), rawImage, destPath); err != nil {
emitStageProgress("flash", 92, fmt.Sprintf("Writing the latest image for %s to %s", *node, destPath))
writeEmitter := newProgressEmitter("flash", 92, 98, fmt.Sprintf("Writing the latest image for %s", *node), true)
if err := writer.WriteImageWithProgress(context.Background(), rawImage, destPath, writeEmitter); err != nil {
fatalf("write image: %v", err)
}
emitStageProgress("flash", 99, fmt.Sprintf("Flushing the finished image for %s", *node))
_ = exec.Command("sync").Run()
if strings.HasPrefix(destPath, "/dev/") {
_ = exec.Command("blockdev", "--flushbufs", destPath).Run()
@ -192,6 +240,55 @@ func writeStructuredResult(payload any) {
_ = os.WriteFile("/dev/termination-log", data, 0o644)
}
func emitStageProgress(stage string, progress float64, message string) {
emitProgress(service.RemoteProgressUpdate{
Stage: stage,
ProgressPct: progress,
Message: message,
})
}
func emitProgress(update service.RemoteProgressUpdate) {
line := service.ProgressLogLine(update)
if strings.TrimSpace(line) == "" {
return
}
fmt.Fprintln(os.Stdout, line)
}
func newProgressEmitter(stage string, minPct, maxPct float64, message string, includeBytes bool) writer.ProgressFunc {
var mu sync.Mutex
lastPct := minPct
lastEmit := time.Time{}
return func(written, total int64) {
if total <= 0 {
return
}
pct := minPct + (float64(written)/float64(total))*(maxPct-minPct)
if pct > maxPct {
pct = maxPct
}
mu.Lock()
defer mu.Unlock()
now := time.Now()
if pct-lastPct < 0.5 && now.Sub(lastEmit) < time.Second {
return
}
update := service.RemoteProgressUpdate{
Stage: stage,
ProgressPct: pct,
Message: message,
}
if includeBytes {
update.WrittenBytes = written
update.TotalBytes = total
}
emitProgress(update)
lastPct = pct
lastEmit = now
}
}
func localFlashDevices(maxBytes int64, hostTmpDir string) ([]service.Device, error) {
cmd := exec.Command("lsblk", "-J", "-b", "-o", "NAME,PATH,RM,HOTPLUG,SIZE,MODEL,TRAN,TYPE")
out, err := cmd.Output()

View File

@ -29,9 +29,23 @@ type partitionTablePart struct {
Type string `json:"type"`
}
type RootFSProgressFunc func(step string)
const (
RootFSProgressFindingPartition = "finding-partition"
RootFSProgressExtracting = "extracting-partition"
RootFSProgressWritingFiles = "writing-rootfs-files"
RootFSProgressReplacing = "replacing-partition"
)
// InjectRootFS rewrites the Linux root partition inside a raw image file without
// requiring block-device mounts. Only rootfs-targeted files are written.
func InjectRootFS(imagePath string, files []inject.FileSpec) error {
return InjectRootFSWithProgress(imagePath, files, nil)
}
// InjectRootFSWithProgress emits coarse step changes while rewriting the root partition.
func InjectRootFSWithProgress(imagePath string, files []inject.FileSpec, progress RootFSProgressFunc) error {
rootFiles := make([]inject.FileSpec, 0, len(files))
for _, f := range files {
if f.RootFS {
@ -42,6 +56,7 @@ func InjectRootFS(imagePath string, files []inject.FileSpec) error {
return nil
}
emitRootFSProgress(progress, RootFSProgressFindingPartition)
part, sectorSize, err := findLinuxPartition(imagePath)
if err != nil {
return err
@ -54,15 +69,24 @@ func InjectRootFS(imagePath string, files []inject.FileSpec) error {
defer os.RemoveAll(workDir)
rootImage := filepath.Join(workDir, "root.ext4")
emitRootFSProgress(progress, RootFSProgressExtracting)
if err := extractPartition(imagePath, rootImage, part, sectorSize); err != nil {
return err
}
emitRootFSProgress(progress, RootFSProgressWritingFiles)
if err := writeExt4Files(rootImage, rootFiles); err != nil {
return err
}
emitRootFSProgress(progress, RootFSProgressReplacing)
return replacePartition(imagePath, rootImage, part, sectorSize)
}
func emitRootFSProgress(progress RootFSProgressFunc, step string) {
if progress != nil {
progress(step)
}
}
func findLinuxPartition(imagePath string) (partitionTablePart, uint64, error) {
out, err := exec.Command("sfdisk", "-J", imagePath).Output()
if err != nil {

View File

@ -231,7 +231,11 @@ func (a *App) runRemotePod(jobID, podName string, podSpec map[string]any) (strin
return "", err
}
deadline := time.Now().Add(12 * time.Minute)
timeout := time.Duration(a.settings.RemotePodTimeout) * time.Second
if timeout < 5*time.Minute {
timeout = 5 * time.Minute
}
deadline := time.Now().Add(timeout)
lastState := podState{Name: podName}
for time.Now().Before(deadline) {
state, err := a.remotePodState(kube, podName)
@ -240,6 +244,11 @@ func (a *App) runRemotePod(jobID, podName string, podSpec map[string]any) (strin
}
lastState = state
if strings.TrimSpace(jobID) != "" {
if logs, logErr := a.remotePodLogs(kube, podName); logErr == nil {
if update, ok := parseRemoteProgressLogs(logs); ok {
a.applyRemoteProgress(jobID, update)
}
}
a.heartbeatRemoteJob(jobID)
}
switch state.Phase {

View File

@ -22,6 +22,7 @@ func TestSettingsHelpersAndSmallUtilities(t *testing.T) {
t.Setenv("METIS_MAX_DEVICE_BYTES", "12345")
t.Setenv("METIS_DEFAULT_FLASH_HOST", "flash-1")
t.Setenv("METIS_LOCAL_HOST", "local-1")
t.Setenv("METIS_REMOTE_POD_TIMEOUT_SEC", "1800")
settings := FromEnv()
if got, want := settings.CacheDir, filepath.Join(dataDir, "cache"); got != want {
@ -33,6 +34,9 @@ func TestSettingsHelpersAndSmallUtilities(t *testing.T) {
if settings.MaxDeviceBytes != 12345 {
t.Fatalf("expected MaxDeviceBytes=12345, got %d", settings.MaxDeviceBytes)
}
if settings.RemotePodTimeout != 1800 {
t.Fatalf("expected RemotePodTimeout=1800, got %d", settings.RemotePodTimeout)
}
if !reflect.DeepEqual(splitList("a, b,, c"), []string{"a", "b", "c"}) {
t.Fatalf("splitList mismatch")
}

View File

@ -270,6 +270,17 @@ func (a *App) heartbeatRemoteJob(jobID string) {
}
j.Message = fmt.Sprintf("Validating %s on %s and resolving the latest Harbor artifact", prettyDeviceTarget(j.Device), j.Host)
case "flash":
if j.Total > 0 && j.Written > 0 {
actual := 88 + (float64(j.Written)/float64(j.Total))*10
if actual > 98 {
actual = 98
}
if actual > j.ProgressPct {
j.ProgressPct = actual
}
j.Message = fmt.Sprintf("Writing %s of %s on %s", humanBytes(j.Written), humanBytes(j.Total), j.Host)
return
}
progress, message := flashStageHeartbeat(j.Host, j.Artifact, elapsed)
if progress > j.ProgressPct {
j.ProgressPct = progress

View File

@ -43,3 +43,19 @@ func TestFlashStageHeartbeatProgresses(t *testing.T) {
t.Fatalf("expected flushing message, got %q", m3)
}
}
func TestParseRemoteProgressLogsFindsLatestMarker(t *testing.T) {
logs := strings.Join([]string{
"plain log line",
ProgressLogLine(RemoteProgressUpdate{Stage: "build", ProgressPct: 40, Message: "phase one"}),
ProgressLogLine(RemoteProgressUpdate{Stage: "build", ProgressPct: 72, Message: "phase two"}),
`{"local_path":"/tmp/out.img.xz"}`,
}, "\n")
update, ok := parseRemoteProgressLogs(logs)
if !ok {
t.Fatal("expected progress marker")
}
if update.ProgressPct != 72 || update.Message != "phase two" {
t.Fatalf("unexpected update: %#v", update)
}
}

View File

@ -0,0 +1,77 @@
package service
import (
"bufio"
"encoding/json"
"strings"
"time"
)
const progressLogPrefix = "METIS_PROGRESS "
// RemoteProgressUpdate is emitted by remote workers so the UI can show
// concrete stage transitions instead of relying only on elapsed-time guesses.
type RemoteProgressUpdate struct {
Stage string `json:"stage,omitempty"`
ProgressPct float64 `json:"progress_pct,omitempty"`
Message string `json:"message,omitempty"`
WrittenBytes int64 `json:"written_bytes,omitempty"`
TotalBytes int64 `json:"total_bytes,omitempty"`
}
// ProgressLogLine formats a progress update for remote worker stdout.
func ProgressLogLine(update RemoteProgressUpdate) string {
data, err := json.Marshal(update)
if err != nil {
return ""
}
return progressLogPrefix + string(data)
}
func parseRemoteProgressLogs(logs string) (RemoteProgressUpdate, bool) {
scanner := bufio.NewScanner(strings.NewReader(logs))
scanner.Buffer(make([]byte, 0, 4096), 1<<20)
var latest RemoteProgressUpdate
found := false
for scanner.Scan() {
line := strings.TrimSpace(scanner.Text())
if !strings.HasPrefix(line, progressLogPrefix) {
continue
}
raw := strings.TrimSpace(strings.TrimPrefix(line, progressLogPrefix))
var update RemoteProgressUpdate
if err := json.Unmarshal([]byte(raw), &update); err != nil {
continue
}
latest = update
found = true
}
return latest, found
}
func (a *App) applyRemoteProgress(jobID string, update RemoteProgressUpdate) {
if strings.TrimSpace(jobID) == "" {
return
}
a.setJob(jobID, func(j *Job) {
if j == nil || j.Status != JobRunning {
return
}
if stage := strings.TrimSpace(update.Stage); stage != "" && stage != j.Stage {
j.Stage = stage
j.StageStartedAt = time.Now().UTC()
}
if update.ProgressPct > j.ProgressPct {
j.ProgressPct = update.ProgressPct
}
if message := strings.TrimSpace(update.Message); message != "" {
j.Message = message
}
if update.WrittenBytes > 0 {
j.Written = update.WrittenBytes
}
if update.TotalBytes > 0 {
j.Total = update.TotalBytes
}
})
}

View File

@ -33,6 +33,7 @@ type Settings struct {
HarborUsername string
HarborPassword string
HostTmpDir string
RemotePodTimeout int64
}
// FromEnv builds service settings with sensible defaults for local dev and in-cluster use.
@ -64,6 +65,7 @@ func FromEnv() Settings {
HarborUsername: getenvDefault("METIS_HARBOR_USERNAME", ""),
HarborPassword: getenvDefault("METIS_HARBOR_PASSWORD", ""),
HostTmpDir: getenvDefault("METIS_HOST_TMP_DIR", "/tmp/metis-flash-test"),
RemotePodTimeout: getenvInt64("METIS_REMOTE_POD_TIMEOUT_SEC", 1800),
}
}