diff --git a/cmd/metis/remote_cmd.go b/cmd/metis/remote_cmd.go index 9445281..11af42e 100644 --- a/cmd/metis/remote_cmd.go +++ b/cmd/metis/remote_cmd.go @@ -11,8 +11,10 @@ import ( "sort" "strconv" "strings" + "sync" "time" + "metis/pkg/image" "metis/pkg/plan" "metis/pkg/service" "metis/pkg/writer" @@ -63,9 +65,45 @@ func remoteBuildCmd(args []string) { } output := filepath.Join(*workDir, fmt.Sprintf("%s.img", *node)) inv := loadInventory(*invPath) - if err := plan.BuildImageFile(context.Background(), inv, *node, *cacheDir, output); err != nil { - fatalf("build image: %v", err) + emitStageProgress("build", 12, fmt.Sprintf("Resolving the replacement build plan for %s", *node)) + p, err := plan.Build(inv, *node, output, *cacheDir) + if err != nil { + fatalf("build plan: %v", err) } + _, class, err := inv.FindNode(*node) + if err != nil { + fatalf("load node class: %v", err) + } + cacheImage := filepath.Join(*cacheDir, strings.TrimSuffix(filepath.Base(p.Image), ".xz")) + emitStageProgress("build", 16, fmt.Sprintf("Downloading and verifying the base image for %s", *node)) + cacheImage, err = image.DownloadAndVerify(p.Image, cacheImage, class.Checksum) + if err != nil { + fatalf("download image: %v", err) + } + copyEmitter := newProgressEmitter("build", 20, 34, fmt.Sprintf("Copying the verified base image for %s", *node), false) + if err := writer.WriteImageWithProgress(context.Background(), cacheImage, output, copyEmitter); err != nil { + fatalf("copy base image: %v", err) + } + emitStageProgress("build", 36, fmt.Sprintf("Preparing node-specific injected files for %s", *node)) + files, err := plan.Files(inv, *node) + if err != nil { + fatalf("resolve files: %v", err) + } + rootfsProgress := map[string]service.RemoteProgressUpdate{ + image.RootFSProgressFindingPartition: {Stage: "build", ProgressPct: 40, Message: fmt.Sprintf("Finding the Linux root partition for %s", *node)}, + image.RootFSProgressExtracting: {Stage: "build", ProgressPct: 44, Message: fmt.Sprintf("Extracting the Linux root partition for %s", *node)}, + image.RootFSProgressWritingFiles: {Stage: "build", ProgressPct: 50, Message: fmt.Sprintf("Injecting node-specific files into the root filesystem for %s", *node)}, + image.RootFSProgressReplacing: {Stage: "build", ProgressPct: 56, Message: fmt.Sprintf("Replacing the root partition inside the replacement image for %s", *node)}, + } + if err := image.InjectRootFSWithProgress(output, files, func(step string) { + if update, ok := rootfsProgress[step]; ok { + emitProgress(update) + } + }); err != nil { + fatalf("inject rootfs: %v", err) + } + emitStageProgress("build", 58, fmt.Sprintf("Built the replacement image filesystem for %s", *node)) + emitStageProgress("build", 60, fmt.Sprintf("Compressing the replacement image for %s before upload", *node)) if err := exec.Command("xz", "-T0", "-z", "-f", output).Run(); err != nil { fatalf("xz compress: %v", err) } @@ -92,13 +130,17 @@ func remoteBuildCmd(args []string) { if err := os.WriteFile(metadataPath, metaBytes, 0o644); err != nil { fatalf("write metadata: %v", err) } + emitStageProgress("build", 68, fmt.Sprintf("Compression complete for %s; preparing the Harbor upload", *node)) + emitStageProgress("build", 70, fmt.Sprintf("Authenticating to Harbor for %s", *node)) if err := orasLogin(*harborRegistry, *harborUsername, *harborPassword); err != nil { fatalf("oras login: %v", err) } taggedRef := fmt.Sprintf("%s:%s", *artifactRef, *buildTag) + emitStageProgress("build", 72, fmt.Sprintf("Uploading %s to Harbor", filepath.Base(compressedPath))) if err := orasPush(taggedRef, compressedPath, metadataPath); err != nil { fatalf("oras push: %v", err) } + emitStageProgress("build", 76, fmt.Sprintf("Refreshing the latest Harbor tag for %s", *node)) if err := orasTag(taggedRef, "latest"); err != nil { fatalf("oras tag latest: %v", err) } @@ -133,18 +175,21 @@ func remoteFlashCmd(args []string) { if err := os.MkdirAll(*workDir, 0o755); err != nil { fatalf("mkdir workdir: %v", err) } + emitStageProgress("flash", 84, fmt.Sprintf("Pulling the latest Harbor artifact for %s", *node)) if err := orasLogin(*harborRegistry, *harborUsername, *harborPassword); err != nil { fatalf("oras login: %v", err) } if err := orasPull(fmt.Sprintf("%s:latest", *artifactRef), *workDir); err != nil { fatalf("oras pull: %v", err) } + emitStageProgress("flash", 88, fmt.Sprintf("Preparing the downloaded image for %s", *node)) imagePath, compressed, err := resolvePulledArtifact(*workDir) if err != nil { fatalf("resolve artifact: %v", err) } rawImage := imagePath if compressed { + emitStageProgress("flash", 90, fmt.Sprintf("Decompressing the image for %s before writing", *node)) rawImage = filepath.Join(*workDir, fmt.Sprintf("%s.img", *node)) cmd := exec.Command("sh", "-lc", fmt.Sprintf("xz -dc '%s' > '%s'", imagePath, rawImage)) if out, err := cmd.CombinedOutput(); err != nil { @@ -159,9 +204,12 @@ func remoteFlashCmd(args []string) { } destPath = filepath.Join(*hostTmpDir, fmt.Sprintf("%s.img", *node)) } - if err := writer.WriteImage(context.Background(), rawImage, destPath); err != nil { + emitStageProgress("flash", 92, fmt.Sprintf("Writing the latest image for %s to %s", *node, destPath)) + writeEmitter := newProgressEmitter("flash", 92, 98, fmt.Sprintf("Writing the latest image for %s", *node), true) + if err := writer.WriteImageWithProgress(context.Background(), rawImage, destPath, writeEmitter); err != nil { fatalf("write image: %v", err) } + emitStageProgress("flash", 99, fmt.Sprintf("Flushing the finished image for %s", *node)) _ = exec.Command("sync").Run() if strings.HasPrefix(destPath, "/dev/") { _ = exec.Command("blockdev", "--flushbufs", destPath).Run() @@ -192,6 +240,55 @@ func writeStructuredResult(payload any) { _ = os.WriteFile("/dev/termination-log", data, 0o644) } +func emitStageProgress(stage string, progress float64, message string) { + emitProgress(service.RemoteProgressUpdate{ + Stage: stage, + ProgressPct: progress, + Message: message, + }) +} + +func emitProgress(update service.RemoteProgressUpdate) { + line := service.ProgressLogLine(update) + if strings.TrimSpace(line) == "" { + return + } + fmt.Fprintln(os.Stdout, line) +} + +func newProgressEmitter(stage string, minPct, maxPct float64, message string, includeBytes bool) writer.ProgressFunc { + var mu sync.Mutex + lastPct := minPct + lastEmit := time.Time{} + return func(written, total int64) { + if total <= 0 { + return + } + pct := minPct + (float64(written)/float64(total))*(maxPct-minPct) + if pct > maxPct { + pct = maxPct + } + mu.Lock() + defer mu.Unlock() + now := time.Now() + if pct-lastPct < 0.5 && now.Sub(lastEmit) < time.Second { + return + } + update := service.RemoteProgressUpdate{ + Stage: stage, + ProgressPct: pct, + Message: message, + } + if includeBytes { + update.WrittenBytes = written + update.TotalBytes = total + } + emitProgress(update) + lastPct = pct + lastEmit = now + } +} + func localFlashDevices(maxBytes int64, hostTmpDir string) ([]service.Device, error) { cmd := exec.Command("lsblk", "-J", "-b", "-o", "NAME,PATH,RM,HOTPLUG,SIZE,MODEL,TRAN,TYPE") out, err := cmd.Output() diff --git a/pkg/image/rootfs.go b/pkg/image/rootfs.go index 1cedad2..6102a30 100644 --- a/pkg/image/rootfs.go +++ b/pkg/image/rootfs.go @@ -29,9 +29,23 @@ type partitionTablePart struct { Type string `json:"type"` } +type RootFSProgressFunc func(step string) + +const ( + RootFSProgressFindingPartition = "finding-partition" + RootFSProgressExtracting = "extracting-partition" + RootFSProgressWritingFiles = "writing-rootfs-files" + RootFSProgressReplacing = "replacing-partition" +) + // InjectRootFS rewrites the Linux root partition inside a raw image file without // requiring block-device mounts. Only rootfs-targeted files are written. func InjectRootFS(imagePath string, files []inject.FileSpec) error { + return InjectRootFSWithProgress(imagePath, files, nil) +} + +// InjectRootFSWithProgress emits coarse step changes while rewriting the root partition. +func InjectRootFSWithProgress(imagePath string, files []inject.FileSpec, progress RootFSProgressFunc) error { rootFiles := make([]inject.FileSpec, 0, len(files)) for _, f := range files { if f.RootFS { @@ -42,6 +56,7 @@ func InjectRootFS(imagePath string, files []inject.FileSpec) error { return nil } + emitRootFSProgress(progress, RootFSProgressFindingPartition) part, sectorSize, err := findLinuxPartition(imagePath) if err != nil { return err @@ -54,15 +69,24 @@ func InjectRootFS(imagePath string, files []inject.FileSpec) error { defer os.RemoveAll(workDir) rootImage := filepath.Join(workDir, "root.ext4") + emitRootFSProgress(progress, RootFSProgressExtracting) if err := extractPartition(imagePath, rootImage, part, sectorSize); err != nil { return err } + emitRootFSProgress(progress, RootFSProgressWritingFiles) if err := writeExt4Files(rootImage, rootFiles); err != nil { return err } + emitRootFSProgress(progress, RootFSProgressReplacing) return replacePartition(imagePath, rootImage, part, sectorSize) } +func emitRootFSProgress(progress RootFSProgressFunc, step string) { + if progress != nil { + progress(step) + } +} + func findLinuxPartition(imagePath string) (partitionTablePart, uint64, error) { out, err := exec.Command("sfdisk", "-J", imagePath).Output() if err != nil { diff --git a/pkg/service/cluster.go b/pkg/service/cluster.go index 5561068..0e58e6f 100644 --- a/pkg/service/cluster.go +++ b/pkg/service/cluster.go @@ -231,7 +231,11 @@ func (a *App) runRemotePod(jobID, podName string, podSpec map[string]any) (strin return "", err } - deadline := time.Now().Add(12 * time.Minute) + timeout := time.Duration(a.settings.RemotePodTimeout) * time.Second + if timeout < 5*time.Minute { + timeout = 5 * time.Minute + } + deadline := time.Now().Add(timeout) lastState := podState{Name: podName} for time.Now().Before(deadline) { state, err := a.remotePodState(kube, podName) @@ -240,6 +244,11 @@ func (a *App) runRemotePod(jobID, podName string, podSpec map[string]any) (strin } lastState = state if strings.TrimSpace(jobID) != "" { + if logs, logErr := a.remotePodLogs(kube, podName); logErr == nil { + if update, ok := parseRemoteProgressLogs(logs); ok { + a.applyRemoteProgress(jobID, update) + } + } a.heartbeatRemoteJob(jobID) } switch state.Phase { diff --git a/pkg/service/helpers_test.go b/pkg/service/helpers_test.go index a6e6a57..9729c3b 100644 --- a/pkg/service/helpers_test.go +++ b/pkg/service/helpers_test.go @@ -22,6 +22,7 @@ func TestSettingsHelpersAndSmallUtilities(t *testing.T) { t.Setenv("METIS_MAX_DEVICE_BYTES", "12345") t.Setenv("METIS_DEFAULT_FLASH_HOST", "flash-1") t.Setenv("METIS_LOCAL_HOST", "local-1") + t.Setenv("METIS_REMOTE_POD_TIMEOUT_SEC", "1800") settings := FromEnv() if got, want := settings.CacheDir, filepath.Join(dataDir, "cache"); got != want { @@ -33,6 +34,9 @@ func TestSettingsHelpersAndSmallUtilities(t *testing.T) { if settings.MaxDeviceBytes != 12345 { t.Fatalf("expected MaxDeviceBytes=12345, got %d", settings.MaxDeviceBytes) } + if settings.RemotePodTimeout != 1800 { + t.Fatalf("expected RemotePodTimeout=1800, got %d", settings.RemotePodTimeout) + } if !reflect.DeepEqual(splitList("a, b,, c"), []string{"a", "b", "c"}) { t.Fatalf("splitList mismatch") } diff --git a/pkg/service/remote.go b/pkg/service/remote.go index d53ea03..1bc93ca 100644 --- a/pkg/service/remote.go +++ b/pkg/service/remote.go @@ -270,6 +270,17 @@ func (a *App) heartbeatRemoteJob(jobID string) { } j.Message = fmt.Sprintf("Validating %s on %s and resolving the latest Harbor artifact", prettyDeviceTarget(j.Device), j.Host) case "flash": + if j.Total > 0 && j.Written > 0 { + actual := 88 + (float64(j.Written)/float64(j.Total))*10 + if actual > 98 { + actual = 98 + } + if actual > j.ProgressPct { + j.ProgressPct = actual + } + j.Message = fmt.Sprintf("Writing %s of %s on %s", humanBytes(j.Written), humanBytes(j.Total), j.Host) + return + } progress, message := flashStageHeartbeat(j.Host, j.Artifact, elapsed) if progress > j.ProgressPct { j.ProgressPct = progress diff --git a/pkg/service/remote_progress_test.go b/pkg/service/remote_progress_test.go index 3ad6885..4750832 100644 --- a/pkg/service/remote_progress_test.go +++ b/pkg/service/remote_progress_test.go @@ -43,3 +43,19 @@ func TestFlashStageHeartbeatProgresses(t *testing.T) { t.Fatalf("expected flushing message, got %q", m3) } } + +func TestParseRemoteProgressLogsFindsLatestMarker(t *testing.T) { + logs := strings.Join([]string{ + "plain log line", + ProgressLogLine(RemoteProgressUpdate{Stage: "build", ProgressPct: 40, Message: "phase one"}), + ProgressLogLine(RemoteProgressUpdate{Stage: "build", ProgressPct: 72, Message: "phase two"}), + `{"local_path":"/tmp/out.img.xz"}`, + }, "\n") + update, ok := parseRemoteProgressLogs(logs) + if !ok { + t.Fatal("expected progress marker") + } + if update.ProgressPct != 72 || update.Message != "phase two" { + t.Fatalf("unexpected update: %#v", update) + } +} diff --git a/pkg/service/remote_status.go b/pkg/service/remote_status.go new file mode 100644 index 0000000..91aa742 --- /dev/null +++ b/pkg/service/remote_status.go @@ -0,0 +1,77 @@ +package service + +import ( + "bufio" + "encoding/json" + "strings" + "time" +) + +const progressLogPrefix = "METIS_PROGRESS " + +// RemoteProgressUpdate is emitted by remote workers so the UI can show +// concrete stage transitions instead of relying only on elapsed-time guesses. +type RemoteProgressUpdate struct { + Stage string `json:"stage,omitempty"` + ProgressPct float64 `json:"progress_pct,omitempty"` + Message string `json:"message,omitempty"` + WrittenBytes int64 `json:"written_bytes,omitempty"` + TotalBytes int64 `json:"total_bytes,omitempty"` +} + +// ProgressLogLine formats a progress update for remote worker stdout. +func ProgressLogLine(update RemoteProgressUpdate) string { + data, err := json.Marshal(update) + if err != nil { + return "" + } + return progressLogPrefix + string(data) +} + +func parseRemoteProgressLogs(logs string) (RemoteProgressUpdate, bool) { + scanner := bufio.NewScanner(strings.NewReader(logs)) + scanner.Buffer(make([]byte, 0, 4096), 1<<20) + var latest RemoteProgressUpdate + found := false + for scanner.Scan() { + line := strings.TrimSpace(scanner.Text()) + if !strings.HasPrefix(line, progressLogPrefix) { + continue + } + raw := strings.TrimSpace(strings.TrimPrefix(line, progressLogPrefix)) + var update RemoteProgressUpdate + if err := json.Unmarshal([]byte(raw), &update); err != nil { + continue + } + latest = update + found = true + } + return latest, found +} + +func (a *App) applyRemoteProgress(jobID string, update RemoteProgressUpdate) { + if strings.TrimSpace(jobID) == "" { + return + } + a.setJob(jobID, func(j *Job) { + if j == nil || j.Status != JobRunning { + return + } + if stage := strings.TrimSpace(update.Stage); stage != "" && stage != j.Stage { + j.Stage = stage + j.StageStartedAt = time.Now().UTC() + } + if update.ProgressPct > j.ProgressPct { + j.ProgressPct = update.ProgressPct + } + if message := strings.TrimSpace(update.Message); message != "" { + j.Message = message + } + if update.WrittenBytes > 0 { + j.Written = update.WrittenBytes + } + if update.TotalBytes > 0 { + j.Total = update.TotalBytes + } + }) +} diff --git a/pkg/service/settings.go b/pkg/service/settings.go index dacecd2..85026b1 100644 --- a/pkg/service/settings.go +++ b/pkg/service/settings.go @@ -33,6 +33,7 @@ type Settings struct { HarborUsername string HarborPassword string HostTmpDir string + RemotePodTimeout int64 } // FromEnv builds service settings with sensible defaults for local dev and in-cluster use. @@ -64,6 +65,7 @@ func FromEnv() Settings { HarborUsername: getenvDefault("METIS_HARBOR_USERNAME", ""), HarborPassword: getenvDefault("METIS_HARBOR_PASSWORD", ""), HostTmpDir: getenvDefault("METIS_HOST_TMP_DIR", "/tmp/metis-flash-test"), + RemotePodTimeout: getenvInt64("METIS_REMOTE_POD_TIMEOUT_SEC", 1800), } }