diff --git a/cmd/arduino-app-cli/system/system.go b/cmd/arduino-app-cli/system/system.go index c23d5317e..048b41912 100644 --- a/cmd/arduino-app-cli/system/system.go +++ b/cmd/arduino-app-cli/system/system.go @@ -113,11 +113,14 @@ func newUpdateCmd() *cobra.Command { events := updater.Subscribe() for event := range events { - if event.Type == update.ErrorEvent { + switch event.Type { + case update.ErrorEvent: // TODO: add colors to error messages err := event.GetError() feedback.Printf("Error: %s [%s]", err.Error(), update.GetUpdateErrorCode(err)) - } else { + case update.ProgressEvent: + feedback.Printf("[%s] %.2f", event.Type.String(), event.GetProgress()) + default: feedback.Printf("[%s] %s", event.Type.String(), event.GetData()) } diff --git a/internal/api/handlers/update.go b/internal/api/handlers/update.go index 49ec1b294..91e7e6f6c 100644 --- a/internal/api/handlers/update.go +++ b/internal/api/handlers/update.go @@ -152,7 +152,8 @@ func HandleUpdateEvents(updater *update.Manager) http.HandlerFunc { slog.Info("APT event channel closed, stopping SSE stream") return } - if event.Type == update.ErrorEvent { + switch event.Type { + case update.ErrorEvent: err := event.GetError() code := render.InternalServiceErr if c := update.GetUpdateErrorCode(err); c != update.UnknownErrorCode { @@ -162,7 +163,12 @@ func HandleUpdateEvents(updater *update.Manager) http.HandlerFunc { Code: code, Message: err.Error(), }) - } else { + case update.ProgressEvent: + sseStream.Send(render.SSEEvent{ + Type: event.Type.String(), + Data: event.GetProgress(), + }) + default: sseStream.Send(render.SSEEvent{ Type: event.Type.String(), Data: event.GetData(), diff --git a/internal/update/apt/service.go b/internal/update/apt/service.go index 860be68f6..90492c13d 100644 --- a/internal/update/apt/service.go +++ b/internal/update/apt/service.go @@ -78,12 +78,12 @@ func (s *Service) UpgradePackages(ctx context.Context, names []string) (<-chan u return nil, update.ErrOperationAlreadyInProgress } eventsCh := make(chan update.Event, 100) - go func() { defer s.lock.Unlock() defer close(eventsCh) eventsCh <- update.NewDataEvent(update.StartEvent, "Upgrade is starting") + eventsCh <- update.NewProgressEvent(0.0) stream := runUpgradeCommand(ctx, names) for line, err := range stream { if err != nil { @@ -92,16 +92,17 @@ func (s *Service) UpgradePackages(ctx context.Context, names []string) (<-chan u } eventsCh <- update.NewDataEvent(update.UpgradeLineEvent, line) } - eventsCh <- update.NewDataEvent(update.StartEvent, "apt cleaning cache is starting") + eventsCh <- update.NewProgressEvent(80.0) for line, err := range runAptCleanCommand(ctx) { if err != nil { eventsCh <- update.NewErrorEvent(fmt.Errorf("error running apt clean command: %w", err)) return } + eventsCh <- update.NewDataEvent(update.UpgradeLineEvent, line) } - + eventsCh <- update.NewProgressEvent(85.0) eventsCh <- update.NewDataEvent(update.UpgradeLineEvent, "Stop and destroy docker containers and images ....") streamCleanup := cleanupDockerContainers(ctx) for line, err := range streamCleanup { @@ -113,6 +114,7 @@ func (s *Service) UpgradePackages(ctx context.Context, names []string) (<-chan u eventsCh <- update.NewDataEvent(update.UpgradeLineEvent, line) } } + eventsCh <- update.NewProgressEvent(90.0) // TODO: Remove this workaround once docker image versions are no longer hardcoded in arduino-app-cli. // Tracking issue: https://github.com/arduino/arduino-app-cli/issues/600 @@ -128,7 +130,7 @@ func (s *Service) UpgradePackages(ctx context.Context, names []string) (<-chan u eventsCh <- update.NewDataEvent(update.UpgradeLineEvent, line) } eventsCh <- update.NewDataEvent(update.RestartEvent, "Upgrade completed. Restarting ...") - + eventsCh <- update.NewProgressEvent(100.0) err := restartServices(ctx) if err != nil { eventsCh <- update.NewErrorEvent(fmt.Errorf("error restarting services after upgrade: %w", err)) diff --git a/internal/update/arduino/arduino.go b/internal/update/arduino/arduino.go index 0c4e5d299..b6757a477 100644 --- a/internal/update/arduino/arduino.go +++ b/internal/update/arduino/arduino.go @@ -131,24 +131,44 @@ func (a *ArduinoPlatformUpdater) UpgradePackages(ctx context.Context, names []st } eventsCh := make(chan update.Event, 100) - downloadProgressCB := func(curr *rpc.DownloadProgress) { - data := helpers.ArduinoCLIDownloadProgressToString(curr) - slog.Debug("Download progress", slog.String("download_progress", data)) - eventsCh <- update.NewDataEvent(update.UpgradeLineEvent, data) - } - taskProgressCB := func(msg *rpc.TaskProgress) { - data := helpers.ArduinoCLITaskProgressToString(msg) - slog.Debug("Task progress", slog.String("task_progress", data)) - eventsCh <- update.NewDataEvent(update.UpgradeLineEvent, data) - } - go func() { defer a.lock.Unlock() defer close(eventsCh) + const indexBase float32 = 0.0 + const indexWeight float32 = 30.0 + const upgradeBase float32 = 30.0 + const upgradeWeight float32 = 60.0 + + makeDownloadProgressCallback := func(basePercentage, phaseWeight float32) func(*rpc.DownloadProgress) { + return func(curr *rpc.DownloadProgress) { + data := helpers.ArduinoCLIDownloadProgressToString(curr) + eventsCh <- update.NewDataEvent(update.UpgradeLineEvent, data) + if updateInfo := curr.GetUpdate(); updateInfo != nil { + if updateInfo.GetTotalSize() <= 0 { + return + } + localProgress := (float32(updateInfo.GetDownloaded()) / float32(updateInfo.GetTotalSize())) * 100.0 + totalArduinoProgress := basePercentage + (localProgress/100.0)*phaseWeight + eventsCh <- update.NewProgressEvent(totalArduinoProgress) + } + } + } + makeTaskProgressCallback := func(basePercentage, phaseWeight float32) func(*rpc.TaskProgress) { + return func(msg *rpc.TaskProgress) { + data := helpers.ArduinoCLITaskProgressToString(msg) + eventsCh <- update.NewDataEvent(update.UpgradeLineEvent, data) + if !msg.GetCompleted() { + localProgress := msg.GetPercent() + totalArduinoProgress := basePercentage + (localProgress/100.0)*phaseWeight + eventsCh <- update.NewProgressEvent(totalArduinoProgress) + } + } + } + eventsCh <- update.NewDataEvent(update.StartEvent, "Upgrade is starting") - logrus.SetLevel(logrus.ErrorLevel) // Reduce the log level of arduino-cli + logrus.SetLevel(logrus.ErrorLevel) srv := commands.NewArduinoCoreServer() if err := setConfig(ctx, srv); err != nil { @@ -172,21 +192,28 @@ func (a *ArduinoPlatformUpdater) UpgradePackages(ctx context.Context, names []st }() { - stream, _ := commands.UpdateIndexStreamResponseToCallbackFunction(ctx, downloadProgressCB) + updateIndexProgressCB := makeDownloadProgressCallback(indexBase, indexWeight) + stream, _ := commands.UpdateIndexStreamResponseToCallbackFunction(ctx, updateIndexProgressCB) if err := srv.UpdateIndex(&rpc.UpdateIndexRequest{Instance: inst}, stream); err != nil { eventsCh <- update.NewErrorEvent(fmt.Errorf("error updating index: %w", err)) return } + + eventsCh <- update.NewProgressEvent(indexBase + indexWeight) + if err := srv.Init(&rpc.InitRequest{Instance: inst}, commands.InitStreamResponseToCallbackFunction(ctx, nil)); err != nil { eventsCh <- update.NewErrorEvent(fmt.Errorf("error initializing instance: %w", err)) return } } + platformDownloadCB := makeDownloadProgressCallback(upgradeBase, upgradeWeight) + platformTaskCB := makeTaskProgressCallback(upgradeBase, upgradeWeight) + stream, respCB := commands.PlatformUpgradeStreamResponseToCallbackFunction( ctx, - downloadProgressCB, - taskProgressCB, + platformDownloadCB, + platformTaskCB, ) if err := srv.PlatformUpgrade( &rpc.PlatformUpgradeRequest{ @@ -218,8 +245,8 @@ func (a *ArduinoPlatformUpdater) UpgradePackages(ctx context.Context, names []st }, commands.PlatformInstallStreamResponseToCallbackFunction( ctx, - downloadProgressCB, - taskProgressCB, + platformDownloadCB, + platformTaskCB, ), ) if err != nil { @@ -247,6 +274,7 @@ func (a *ArduinoPlatformUpdater) UpgradePackages(ctx context.Context, names []st eventsCh <- update.NewErrorEvent(fmt.Errorf("error burning bootloader: %w", err)) return } + eventsCh <- update.NewProgressEvent(100.0) }() return eventsCh, nil diff --git a/internal/update/event.go b/internal/update/event.go index 2aac04ee9..2c6016fa0 100644 --- a/internal/update/event.go +++ b/internal/update/event.go @@ -15,7 +15,9 @@ package update -import "go.bug.st/f" +import ( + "go.bug.st/f" +) // EventType defines the type of upgrade event. type EventType int @@ -24,16 +26,17 @@ const ( UpgradeLineEvent EventType = iota StartEvent RestartEvent + ProgressEvent DoneEvent ErrorEvent ) // Event represents a single event in the upgrade process. type Event struct { - Type EventType - - data string - err error // error field for error events + Type EventType + progress float32 + data string + err error // error field for error events } func (t EventType) String() string { @@ -44,6 +47,8 @@ func (t EventType) String() string { return "restarting" case StartEvent: return "starting" + case ProgressEvent: + return "progress" case DoneEvent: return "done" case ErrorEvent: @@ -60,6 +65,13 @@ func NewDataEvent(t EventType, data string) Event { } } +func NewProgressEvent(progress float32) Event { + return Event{ + Type: ProgressEvent, + progress: progress, + } +} + func NewErrorEvent(err error) Event { return Event{ Type: ErrorEvent, @@ -77,6 +89,11 @@ func (e Event) GetError() error { return e.err } +func (e Event) GetProgress() float32 { + f.Assert(e.Type == ProgressEvent, "not a progress event") + return e.progress +} + type PackageType string const ( diff --git a/internal/update/update.go b/internal/update/update.go index 7e3bc99c2..c683105bb 100644 --- a/internal/update/update.go +++ b/internal/update/update.go @@ -136,24 +136,37 @@ func (m *Manager) UpgradePackages(ctx context.Context, pkgs []UpgradablePackage) // update of the cores we will end up with inconsistent state, or // we need to re run the upgrade because the orchestrator interrupted // in the middle the upgrade of the cores. + + const arduinoWeight float32 = 20.0 + const aptWeight float32 = 80.0 + arduinoEvents, err := m.arduinoPlatformUpdateService.UpgradePackages(ctx, arduinoPlatform) if err != nil { m.broadcast(NewErrorEvent(fmt.Errorf("failed to upgrade Arduino packages: %w", err))) return } for e := range arduinoEvents { - m.broadcast(e) + if e.Type == ProgressEvent { + globalProgress := (e.progress / 100.0) * arduinoWeight + m.broadcast(NewProgressEvent(globalProgress)) + } else { + m.broadcast(e) + } } - aptEvents, err := m.debUpdateService.UpgradePackages(ctx, debPkgs) if err != nil { m.broadcast(NewErrorEvent(fmt.Errorf("failed to upgrade APT packages: %w", err))) return } for e := range aptEvents { - m.broadcast(e) + if e.Type == ProgressEvent { + globalProgress := arduinoWeight + (e.progress/100.0)*aptWeight + m.broadcast(NewProgressEvent(globalProgress)) + } else { + m.broadcast(e) + } } - + m.broadcast(NewProgressEvent(100.0)) m.broadcast(NewDataEvent(DoneEvent, "Update completed")) }() return nil