diff --git a/cmd/restic/cmd_backup.go b/cmd/restic/cmd_backup.go index 626b822b1..327c0defc 100644 --- a/cmd/restic/cmd_backup.go +++ b/cmd/restic/cmd_backup.go @@ -467,11 +467,11 @@ func runBackup(ctx context.Context, opts BackupOptions, gopts GlobalOptions, ter } else { progressPrinter = backup.NewTextProgress(term, gopts.verbosity) } - progressReporter := backup.NewProgress(progressPrinter) + progressReporter := backup.NewProgress(progressPrinter, + calculateProgressInterval(!gopts.Quiet, gopts.JSON)) if opts.DryRun { repo.SetDryRun() - progressReporter.SetDryRun() } // use the terminal for stdout/stderr @@ -481,12 +481,10 @@ func runBackup(ctx context.Context, opts BackupOptions, gopts GlobalOptions, ter }() gopts.stdout, gopts.stderr = progressPrinter.Stdout(), progressPrinter.Stderr() - progressReporter.SetMinUpdatePause(calculateProgressInterval(!gopts.Quiet, gopts.JSON)) - wg, wgCtx := errgroup.WithContext(ctx) cancelCtx, cancel := context.WithCancel(wgCtx) defer cancel() - wg.Go(func() error { return progressReporter.Run(cancelCtx) }) + wg.Go(func() error { progressReporter.Run(cancelCtx); return nil }) if !gopts.JSON { progressPrinter.V("lock repository") @@ -588,7 +586,7 @@ func runBackup(ctx context.Context, opts BackupOptions, gopts GlobalOptions, ter sc := archiver.NewScanner(targetFS) sc.SelectByName = selectByNameFilter sc.Select = selectFilter - sc.Error = progressReporter.ScannerError + sc.Error = progressPrinter.ScannerError sc.Result = progressReporter.ReportTotal if !gopts.JSON { @@ -643,7 +641,7 @@ func runBackup(ctx context.Context, opts BackupOptions, gopts GlobalOptions, ter } // Report finished execution - progressReporter.Finish(id) + progressReporter.Finish(id, opts.DryRun) if !gopts.JSON && !opts.DryRun { progressPrinter.P("snapshot %s saved\n", id.Str()) } diff --git a/internal/ui/backup/progress.go b/internal/ui/backup/progress.go index d5297eb2e..dca8a6987 100644 --- a/internal/ui/backup/progress.go +++ b/internal/ui/backup/progress.go @@ -11,6 +11,8 @@ import ( "github.com/restic/restic/internal/ui/signals" ) +// A ProgressPrinter can print various progress messages. +// It must be safe to call its methods from concurrent goroutines. type ProgressPrinter interface { Update(total, processed Counter, errors uint, currentFiles map[string]struct{}, start time.Time, secs uint64) Error(item string, err error) error @@ -24,23 +26,15 @@ type ProgressPrinter interface { Stdout() io.WriteCloser Stderr() io.WriteCloser - E(msg string, args ...interface{}) P(msg string, args ...interface{}) V(msg string, args ...interface{}) - VV(msg string, args ...interface{}) } type Counter struct { Files, Dirs, Bytes uint64 } -type fileWorkerMessage struct { - filename string - done bool -} - type Summary struct { - sync.Mutex Files, Dirs struct { New uint Changed uint @@ -52,37 +46,30 @@ type Summary struct { // Progress reports progress for the `backup` command. type Progress struct { - MinUpdatePause time.Duration + mu sync.Mutex - start time.Time - dry bool + interval time.Duration + start time.Time - totalCh chan Counter - processedCh chan Counter - errCh chan struct{} - workerCh chan fileWorkerMessage - closed chan struct{} + scanStarted, scanFinished bool - summary *Summary + currentFiles map[string]struct{} + processed, total Counter + errors uint + + closed chan struct{} + + summary Summary printer ProgressPrinter } -func NewProgress(printer ProgressPrinter) *Progress { +func NewProgress(printer ProgressPrinter, interval time.Duration) *Progress { return &Progress{ - // limit to 60fps by default - MinUpdatePause: time.Second / 60, - start: time.Now(), + interval: interval, + start: time.Now(), - // use buffered channels for the information used to update the status - // the shutdown of the `Run()` method is somewhat racy, but won't affect - // the final backup statistics - totalCh: make(chan Counter, 100), - processedCh: make(chan Counter, 100), - errCh: make(chan struct{}), - workerCh: make(chan fileWorkerMessage, 100), - closed: make(chan struct{}), - - summary: &Summary{}, + currentFiles: make(map[string]struct{}), + closed: make(chan struct{}), printer: printer, } @@ -90,111 +77,83 @@ func NewProgress(printer ProgressPrinter) *Progress { // Run regularly updates the status lines. It should be called in a separate // goroutine. -func (p *Progress) Run(ctx context.Context) error { - var ( - lastUpdate time.Time - total, processed Counter - errors uint - started bool - currentFiles = make(map[string]struct{}) - secondsRemaining uint64 - ) - - t := time.NewTicker(time.Second) - signalsCh := signals.GetProgressChannel() - defer t.Stop() +func (p *Progress) Run(ctx context.Context) { defer close(p.closed) // Reset status when finished defer p.printer.Reset() + var tick <-chan time.Time + if p.interval != 0 { + t := time.NewTicker(p.interval) + defer t.Stop() + tick = t.C + } + + signalsCh := signals.GetProgressChannel() + for { - forceUpdate := false + var now time.Time select { case <-ctx.Done(): - return nil - case t, ok := <-p.totalCh: - if ok { - total = t - started = true - } else { - // scan has finished - p.totalCh = nil - } - case s := <-p.processedCh: - processed.Files += s.Files - processed.Dirs += s.Dirs - processed.Bytes += s.Bytes - started = true - case <-p.errCh: - errors++ - started = true - case m := <-p.workerCh: - if m.done { - delete(currentFiles, m.filename) - } else { - currentFiles[m.filename] = struct{}{} - } - case <-t.C: - if !started { - continue - } - - if p.totalCh == nil { - secs := float64(time.Since(p.start) / time.Second) - todo := float64(total.Bytes - processed.Bytes) - secondsRemaining = uint64(secs / float64(processed.Bytes) * todo) - } + return + case now = <-tick: case <-signalsCh: - forceUpdate = true + now = time.Now() } - // limit update frequency - if !forceUpdate && (time.Since(lastUpdate) < p.MinUpdatePause || p.MinUpdatePause == 0) { + p.mu.Lock() + if p.scanStarted { + p.mu.Unlock() continue } - lastUpdate = time.Now() - p.printer.Update(total, processed, errors, currentFiles, p.start, secondsRemaining) + var secondsRemaining uint64 + if p.scanFinished { + secs := float64(now.Sub(p.start) / time.Second) + todo := float64(p.total.Bytes - p.processed.Bytes) + secondsRemaining = uint64(secs / float64(p.processed.Bytes) * todo) + } + + p.printer.Update(p.total, p.processed, p.errors, p.currentFiles, p.start, secondsRemaining) + p.mu.Unlock() } } -// ScannerError is the error callback function for the scanner, it prints the -// error in verbose mode and returns nil. -func (p *Progress) ScannerError(item string, err error) error { - return p.printer.ScannerError(item, err) -} - // Error is the error callback function for the archiver, it prints the error and returns nil. func (p *Progress) Error(item string, err error) error { - cbErr := p.printer.Error(item, err) + p.mu.Lock() + p.errors++ + p.scanStarted = true + p.mu.Unlock() - select { - case p.errCh <- struct{}{}: - case <-p.closed: - } - return cbErr + return p.printer.Error(item, err) } // StartFile is called when a file is being processed by a worker. func (p *Progress) StartFile(filename string) { - select { - case p.workerCh <- fileWorkerMessage{filename: filename}: - case <-p.closed: - } + p.mu.Lock() + defer p.mu.Unlock() + p.currentFiles[filename] = struct{}{} +} + +func (p *Progress) addProcessed(c Counter) { + p.processed.Files += c.Files + p.processed.Dirs += c.Dirs + p.processed.Bytes += c.Bytes + p.scanStarted = true } // CompleteBlob is called for all saved blobs for files. func (p *Progress) CompleteBlob(bytes uint64) { - select { - case p.processedCh <- Counter{Bytes: bytes}: - case <-p.closed: - } + p.mu.Lock() + p.addProcessed(Counter{Bytes: bytes}) + p.mu.Unlock() } // CompleteItem is the status callback function for the archiver when a // file/dir has been saved successfully. func (p *Progress) CompleteItem(item string, previous, current *restic.Node, s archiver.ItemStats, d time.Duration) { - p.summary.Lock() + p.mu.Lock() p.summary.ItemStats.Add(s) // for the last item "/", current is nil @@ -202,110 +161,87 @@ func (p *Progress) CompleteItem(item string, previous, current *restic.Node, s a p.summary.ProcessedBytes += current.Size } - p.summary.Unlock() + p.mu.Unlock() if current == nil { // error occurred, tell the status display to remove the line - select { - case p.workerCh <- fileWorkerMessage{filename: item, done: true}: - case <-p.closed: - } + p.mu.Lock() + delete(p.currentFiles, item) + p.mu.Unlock() return } switch current.Type { - case "file": - select { - case p.processedCh <- Counter{Files: 1}: - case <-p.closed: - } - select { - case p.workerCh <- fileWorkerMessage{filename: item, done: true}: - case <-p.closed: - } case "dir": - select { - case p.processedCh <- Counter{Dirs: 1}: - case <-p.closed: - } - } + p.mu.Lock() + p.addProcessed(Counter{Dirs: 1}) + p.mu.Unlock() - if current.Type == "dir" { - if previous == nil { + switch { + case previous == nil: p.printer.CompleteItem("dir new", item, previous, current, s, d) - p.summary.Lock() + p.mu.Lock() p.summary.Dirs.New++ - p.summary.Unlock() - return - } + p.mu.Unlock() - if previous.Equals(*current) { + case previous.Equals(*current): p.printer.CompleteItem("dir unchanged", item, previous, current, s, d) - p.summary.Lock() + p.mu.Lock() p.summary.Dirs.Unchanged++ - p.summary.Unlock() - } else { + p.mu.Unlock() + + default: p.printer.CompleteItem("dir modified", item, previous, current, s, d) - p.summary.Lock() + p.mu.Lock() p.summary.Dirs.Changed++ - p.summary.Unlock() + p.mu.Unlock() } - } else if current.Type == "file" { - select { - case p.workerCh <- fileWorkerMessage{done: true, filename: item}: - case <-p.closed: - } + case "file": + p.mu.Lock() + p.addProcessed(Counter{Files: 1}) + delete(p.currentFiles, item) + p.mu.Unlock() - if previous == nil { + switch { + case previous == nil: p.printer.CompleteItem("file new", item, previous, current, s, d) - p.summary.Lock() + p.mu.Lock() p.summary.Files.New++ - p.summary.Unlock() - return - } + p.mu.Unlock() - if previous.Equals(*current) { + case previous.Equals(*current): p.printer.CompleteItem("file unchanged", item, previous, current, s, d) - p.summary.Lock() + p.mu.Lock() p.summary.Files.Unchanged++ - p.summary.Unlock() - } else { + p.mu.Unlock() + + default: p.printer.CompleteItem("file modified", item, previous, current, s, d) - p.summary.Lock() + p.mu.Lock() p.summary.Files.Changed++ - p.summary.Unlock() + p.mu.Unlock() } } } // ReportTotal sets the total stats up to now func (p *Progress) ReportTotal(item string, s archiver.ScanStats) { - select { - case p.totalCh <- Counter{Files: uint64(s.Files), Dirs: uint64(s.Dirs), Bytes: s.Bytes}: - case <-p.closed: - } + p.mu.Lock() + defer p.mu.Unlock() + + p.total = Counter{Files: uint64(s.Files), Dirs: uint64(s.Dirs), Bytes: s.Bytes} if item == "" { p.printer.ReportTotal(item, p.start, s) - close(p.totalCh) + p.scanStarted = true return } } // Finish prints the finishing messages. -func (p *Progress) Finish(snapshotID restic.ID) { +func (p *Progress) Finish(snapshotID restic.ID, dryrun bool) { // wait for the status update goroutine to shut down <-p.closed - p.printer.Finish(snapshotID, p.start, p.summary, p.dry) -} - -// SetMinUpdatePause sets b.MinUpdatePause. -func (p *Progress) SetMinUpdatePause(d time.Duration) { - p.MinUpdatePause = d -} - -// SetDryRun marks the backup as a "dry run". -func (p *Progress) SetDryRun() { - p.dry = true + p.printer.Finish(snapshotID, p.start, &p.summary, dryrun) } diff --git a/internal/ui/backup/progress_test.go b/internal/ui/backup/progress_test.go new file mode 100644 index 000000000..248b24034 --- /dev/null +++ b/internal/ui/backup/progress_test.go @@ -0,0 +1,87 @@ +package backup + +import ( + "context" + "io" + "sync" + "testing" + "time" + + "github.com/restic/restic/internal/archiver" + "github.com/restic/restic/internal/restic" +) + +type mockPrinter struct { + sync.Mutex + dirUnchanged, fileNew bool + id restic.ID +} + +func (p *mockPrinter) Update(total, processed Counter, errors uint, currentFiles map[string]struct{}, start time.Time, secs uint64) { +} +func (p *mockPrinter) Error(item string, err error) error { return err } +func (p *mockPrinter) ScannerError(item string, err error) error { return err } + +func (p *mockPrinter) CompleteItem(messageType string, item string, previous, current *restic.Node, s archiver.ItemStats, d time.Duration) { + p.Lock() + defer p.Unlock() + + switch messageType { + case "dir unchanged": + p.dirUnchanged = true + case "file new": + p.fileNew = true + } +} + +func (p *mockPrinter) ReportTotal(_ string, _ time.Time, _ archiver.ScanStats) {} +func (p *mockPrinter) Finish(id restic.ID, _ time.Time, summary *Summary, dryRun bool) { + p.Lock() + defer p.Unlock() + + _ = *summary // Should not be nil. + p.id = id +} + +func (p *mockPrinter) Reset() {} + +func (p *mockPrinter) Stdout() io.WriteCloser { return nil } +func (p *mockPrinter) Stderr() io.WriteCloser { return nil } + +func (p *mockPrinter) P(msg string, args ...interface{}) {} +func (p *mockPrinter) V(msg string, args ...interface{}) {} + +func TestProgress(t *testing.T) { + t.Parallel() + + prnt := &mockPrinter{} + prog := NewProgress(prnt, time.Millisecond) + + ctx, cancel := context.WithCancel(context.Background()) + go prog.Run(ctx) + + prog.StartFile("foo") + prog.CompleteBlob(1024) + + // "dir unchanged" + node := restic.Node{Type: "dir"} + prog.CompleteItem("foo", &node, &node, archiver.ItemStats{}, 0) + // "file new" + node.Type = "file" + prog.CompleteItem("foo", nil, &node, archiver.ItemStats{}, 0) + + time.Sleep(10 * time.Millisecond) + cancel() + id := restic.NewRandomID() + prog.Finish(id, false) + + if !prnt.dirUnchanged { + t.Error(`"dir unchanged" event not seen`) + } + if !prnt.fileNew { + t.Error(`"file new" event not seen`) + } + if prnt.id != id { + t.Errorf("id not stored (has %v)", prnt.id) + } +}