package service import ( "bufio" "encoding/json" "strings" "time" ) const progressLogPrefix = "METIS_PROGRESS " // RemoteProgressUpdate is emitted by remote workers so the UI can show // concrete stage transitions instead of relying only on elapsed-time guesses. type RemoteProgressUpdate struct { Stage string `json:"stage,omitempty"` ProgressPct float64 `json:"progress_pct,omitempty"` Message string `json:"message,omitempty"` WrittenBytes int64 `json:"written_bytes,omitempty"` TotalBytes int64 `json:"total_bytes,omitempty"` } // ProgressLogLine formats a progress update for remote worker stdout. func ProgressLogLine(update RemoteProgressUpdate) string { data, err := json.Marshal(update) if err != nil { return "" } return progressLogPrefix + string(data) } func parseRemoteProgressLogs(logs string) (RemoteProgressUpdate, bool) { scanner := bufio.NewScanner(strings.NewReader(logs)) scanner.Buffer(make([]byte, 0, 4096), 1<<20) var latest RemoteProgressUpdate found := false for scanner.Scan() { line := strings.TrimSpace(scanner.Text()) if !strings.HasPrefix(line, progressLogPrefix) { continue } raw := strings.TrimSpace(strings.TrimPrefix(line, progressLogPrefix)) var update RemoteProgressUpdate if err := json.Unmarshal([]byte(raw), &update); err != nil { continue } latest = update found = true } return latest, found } func (a *App) applyRemoteProgress(jobID string, update RemoteProgressUpdate) { if strings.TrimSpace(jobID) == "" { return } a.setJob(jobID, func(j *Job) { if j == nil || j.Status != JobRunning { return } if stage := strings.TrimSpace(update.Stage); stage != "" && stage != j.Stage { j.Stage = stage j.StageStartedAt = time.Now().UTC() } if update.ProgressPct > j.ProgressPct { j.ProgressPct = update.ProgressPct } if message := strings.TrimSpace(update.Message); message != "" { j.Message = message } if update.WrittenBytes > 0 { j.Written = update.WrittenBytes } if update.TotalBytes > 0 { j.Total = update.TotalBytes } }) }