metis/pkg/service/remote_status.go

78 lines
2.0 KiB
Go

package service
import (
"bufio"
"encoding/json"
"strings"
"time"
)
const progressLogPrefix = "METIS_PROGRESS "
// RemoteProgressUpdate is emitted by remote workers so the UI can show
// concrete stage transitions instead of relying only on elapsed-time guesses.
type RemoteProgressUpdate struct {
Stage string `json:"stage,omitempty"`
ProgressPct float64 `json:"progress_pct,omitempty"`
Message string `json:"message,omitempty"`
WrittenBytes int64 `json:"written_bytes,omitempty"`
TotalBytes int64 `json:"total_bytes,omitempty"`
}
// ProgressLogLine formats a progress update for remote worker stdout.
func ProgressLogLine(update RemoteProgressUpdate) string {
data, err := json.Marshal(update)
if err != nil {
return ""
}
return progressLogPrefix + string(data)
}
func parseRemoteProgressLogs(logs string) (RemoteProgressUpdate, bool) {
scanner := bufio.NewScanner(strings.NewReader(logs))
scanner.Buffer(make([]byte, 0, 4096), 1<<20)
var latest RemoteProgressUpdate
found := false
for scanner.Scan() {
line := strings.TrimSpace(scanner.Text())
if !strings.HasPrefix(line, progressLogPrefix) {
continue
}
raw := strings.TrimSpace(strings.TrimPrefix(line, progressLogPrefix))
var update RemoteProgressUpdate
if err := json.Unmarshal([]byte(raw), &update); err != nil {
continue
}
latest = update
found = true
}
return latest, found
}
func (a *App) applyRemoteProgress(jobID string, update RemoteProgressUpdate) {
if strings.TrimSpace(jobID) == "" {
return
}
a.setJob(jobID, func(j *Job) {
if j == nil || j.Status != JobRunning {
return
}
if stage := strings.TrimSpace(update.Stage); stage != "" && stage != j.Stage {
j.Stage = stage
j.StageStartedAt = time.Now().UTC()
}
if update.ProgressPct > j.ProgressPct {
j.ProgressPct = update.ProgressPct
}
if message := strings.TrimSpace(update.Message); message != "" {
j.Message = message
}
if update.WrittenBytes > 0 {
j.Written = update.WrittenBytes
}
if update.TotalBytes > 0 {
j.Total = update.TotalBytes
}
})
}