From 448419990ca95c0b20deaa8ab378d7d49d97a4ea Mon Sep 17 00:00:00 2001 From: Dan Willoughby Date: Tue, 26 Jan 2021 12:52:00 -0700 Subject: [PATCH] Refactor backup progress Move the shared logic into the progress Allows logic to be shared with forth coming restore status --- cmd/restic/cmd_backup.go | 72 +++----- internal/ui/backup.go | 293 ++++-------------------------- internal/ui/json/backup.go | 353 +++++++------------------------------ internal/ui/progress.go | 321 +++++++++++++++++++++++++++++++++ 4 files changed, 444 insertions(+), 595 deletions(-) create mode 100644 internal/ui/progress.go diff --git a/cmd/restic/cmd_backup.go b/cmd/restic/cmd_backup.go index ef229ee8a..c67393b01 100644 --- a/cmd/restic/cmd_backup.go +++ b/cmd/restic/cmd_backup.go @@ -527,39 +527,17 @@ func runBackup(opts BackupOptions, gopts GlobalOptions, term *termstatus.Termina return err } - type ArchiveProgressReporter interface { - CompleteItem(item string, previous, current *restic.Node, s archiver.ItemStats, d time.Duration) - StartFile(filename string) - CompleteBlob(filename string, bytes uint64) - ScannerError(item string, fi os.FileInfo, err error) error - ReportTotal(item string, s archiver.ScanStats) - SetMinUpdatePause(d time.Duration) - Run(ctx context.Context) error - Error(item string, fi os.FileInfo, err error) error - Finish(snapshotID restic.ID) - SetDryRun() - - // ui.StdioWrapper - Stdout() io.WriteCloser - Stderr() io.WriteCloser - - // ui.Message - E(msg string, args ...interface{}) - P(msg string, args ...interface{}) - V(msg string, args ...interface{}) - VV(msg string, args ...interface{}) - } - - var p ArchiveProgressReporter + var progressPrinter ui.ProgressPrinter if gopts.JSON { - p = json.NewBackup(term, gopts.verbosity) + progressPrinter = json.NewBackup(term, gopts.verbosity) } else { - p = ui.NewBackup(term, gopts.verbosity) + progressPrinter = ui.NewBackup(term, gopts.verbosity) } + progressReporter := ui.NewProgress(progressPrinter) if opts.DryRun { repo.SetDryRun() - p.SetDryRun() + progressPrinter.SetDryRun() } // use the terminal for stdout/stderr @@ -567,14 +545,14 @@ func runBackup(opts BackupOptions, gopts GlobalOptions, term *termstatus.Termina defer func() { gopts.stdout, gopts.stderr = prevStdout, prevStderr }() - gopts.stdout, gopts.stderr = p.Stdout(), p.Stderr() + gopts.stdout, gopts.stderr = progressPrinter.Stdout(), progressPrinter.Stderr() - p.SetMinUpdatePause(calculateProgressInterval(!gopts.Quiet)) + progressReporter.SetMinUpdatePause(calculateProgressInterval(!gopts.Quiet)) - t.Go(func() error { return p.Run(t.Context(gopts.ctx)) }) + t.Go(func() error { return progressReporter.Run(t.Context(gopts.ctx)) }) if !gopts.JSON { - p.V("lock repository") + progressPrinter.V("lock repository") } lock, err := lockRepo(gopts.ctx, repo) defer unlockRepo(lock) @@ -595,7 +573,7 @@ func runBackup(opts BackupOptions, gopts GlobalOptions, term *termstatus.Termina } if !gopts.JSON { - p.V("load index files") + progressPrinter.V("load index files") } err = repo.LoadIndex(gopts.ctx) if err != nil { @@ -609,9 +587,9 @@ func runBackup(opts BackupOptions, gopts GlobalOptions, term *termstatus.Termina if !gopts.JSON { if parentSnapshotID != nil { - p.P("using parent snapshot %v\n", parentSnapshotID.Str()) + progressPrinter.P("using parent snapshot %v\n", parentSnapshotID.Str()) } else { - p.P("no parent snapshot found, will read all files\n") + progressPrinter.P("no parent snapshot found, will read all files\n") } } @@ -640,12 +618,12 @@ func runBackup(opts BackupOptions, gopts GlobalOptions, term *termstatus.Termina } errorHandler := func(item string, err error) error { - return p.Error(item, nil, err) + return progressReporter.Error(item, nil, err) } messageHandler := func(msg string, args ...interface{}) { if !gopts.JSON { - p.P(msg, args...) + progressPrinter.P(msg, args...) } } @@ -655,7 +633,7 @@ func runBackup(opts BackupOptions, gopts GlobalOptions, term *termstatus.Termina } if opts.Stdin { if !gopts.JSON { - p.V("read data from stdin") + progressPrinter.V("read data from stdin") } filename := path.Join("/", opts.StdinFilename) targetFS = &fs.Reader{ @@ -670,11 +648,11 @@ func runBackup(opts BackupOptions, gopts GlobalOptions, term *termstatus.Termina sc := archiver.NewScanner(targetFS) sc.SelectByName = selectByNameFilter sc.Select = selectFilter - sc.Error = p.ScannerError - sc.Result = p.ReportTotal + sc.Error = progressReporter.ScannerError + sc.Result = progressReporter.ReportTotal if !gopts.JSON { - p.V("start scan on %v", targets) + progressPrinter.V("start scan on %v", targets) } t.Go(func() error { return sc.Scan(t.Context(gopts.ctx), targets) }) @@ -685,11 +663,11 @@ func runBackup(opts BackupOptions, gopts GlobalOptions, term *termstatus.Termina success := true arch.Error = func(item string, fi os.FileInfo, err error) error { success = false - return p.Error(item, fi, err) + return progressReporter.Error(item, fi, err) } - arch.CompleteItem = p.CompleteItem - arch.StartFile = p.StartFile - arch.CompleteBlob = p.CompleteBlob + arch.CompleteItem = progressReporter.CompleteItem + arch.StartFile = progressReporter.StartFile + arch.CompleteBlob = progressReporter.CompleteBlob if opts.IgnoreInode { // --ignore-inode implies --ignore-ctime: on FUSE, the ctime is not @@ -713,7 +691,7 @@ func runBackup(opts BackupOptions, gopts GlobalOptions, term *termstatus.Termina } if !gopts.JSON { - p.V("start backup on %v", targets) + progressPrinter.V("start backup on %v", targets) } _, id, err := arch.Snapshot(gopts.ctx, targets, snapshotOpts) @@ -729,9 +707,9 @@ func runBackup(opts BackupOptions, gopts GlobalOptions, term *termstatus.Termina } // Report finished execution - p.Finish(id) + progressReporter.Finish(id) if !gopts.JSON && !opts.DryRun { - p.P("snapshot %s saved\n", id.Str()) + progressPrinter.P("snapshot %s saved\n", id.Str()) } if !success { return ErrInvalidSourceData diff --git a/internal/ui/backup.go b/internal/ui/backup.go index 59f85487f..1ae3babac 100644 --- a/internal/ui/backup.go +++ b/internal/ui/backup.go @@ -1,58 +1,23 @@ package ui import ( - "context" "fmt" "os" "sort" - "sync" "time" "github.com/restic/restic/internal/archiver" "github.com/restic/restic/internal/restic" - "github.com/restic/restic/internal/ui/signals" "github.com/restic/restic/internal/ui/termstatus" ) -type counter struct { - Files, Dirs uint - Bytes uint64 -} - -type fileWorkerMessage struct { - filename string - done bool -} - // Backup reports progress for the `backup` command. type Backup struct { *Message *StdioWrapper - MinUpdatePause time.Duration - - term *termstatus.Terminal - start time.Time - - totalBytes uint64 - dry bool // true if writes are faked - - totalCh chan counter - processedCh chan counter - errCh chan struct{} - workerCh chan fileWorkerMessage - closed chan struct{} - - summary struct { - sync.Mutex - Files, Dirs struct { - New uint - Changed uint - Unchanged uint - } - ProcessedBytes uint64 - archiver.ItemStats - } + term *termstatus.Terminal + dry bool // true if writes are faked } // NewBackup returns a new backup progress reporter. @@ -61,102 +26,16 @@ func NewBackup(term *termstatus.Terminal, verbosity uint) *Backup { Message: NewMessage(term, verbosity), StdioWrapper: NewStdioWrapper(term), term: term, - start: time.Now(), - - // limit to 60fps by default - MinUpdatePause: time.Second / 60, - - totalCh: make(chan counter), - processedCh: make(chan counter), - errCh: make(chan struct{}), - workerCh: make(chan fileWorkerMessage), - closed: make(chan struct{}), - } -} - -// Run regularly updates the status lines. It should be called in a separate -// goroutine. -func (b *Backup) Run(ctx context.Context) error { - var ( - lastUpdate time.Time - total, processed counter - errors uint - started bool - currentFiles = make(map[string]struct{}) - secondsRemaining uint64 - ) - - t := time.NewTicker(time.Second) - signalsCh := signals.GetProgressChannel() - defer t.Stop() - defer close(b.closed) - // Reset status when finished - defer func() { - if b.term.CanUpdateStatus() { - b.term.SetStatus([]string{""}) - } - }() - - for { - forceUpdate := false - - select { - case <-ctx.Done(): - return nil - case t, ok := <-b.totalCh: - if ok { - total = t - started = true - } else { - // scan has finished - b.totalCh = nil - b.totalBytes = total.Bytes - } - case s := <-b.processedCh: - processed.Files += s.Files - processed.Dirs += s.Dirs - processed.Bytes += s.Bytes - started = true - case <-b.errCh: - errors++ - started = true - case m := <-b.workerCh: - if m.done { - delete(currentFiles, m.filename) - } else { - currentFiles[m.filename] = struct{}{} - } - case <-t.C: - if !started { - continue - } - - if b.totalCh == nil { - secs := float64(time.Since(b.start) / time.Second) - todo := float64(total.Bytes - processed.Bytes) - secondsRemaining = uint64(secs / float64(processed.Bytes) * todo) - } - case <-signalsCh: - forceUpdate = true - } - - // limit update frequency - if !forceUpdate && (time.Since(lastUpdate) < b.MinUpdatePause || b.MinUpdatePause == 0) { - continue - } - lastUpdate = time.Now() - - b.update(total, processed, errors, currentFiles, secondsRemaining) } } // update updates the status lines. -func (b *Backup) update(total, processed counter, errors uint, currentFiles map[string]struct{}, secs uint64) { +func (b *Backup) Update(total, processed Counter, errors uint, currentFiles map[string]struct{}, start time.Time, secs uint64) { var status string if total.Files == 0 && total.Dirs == 0 { // no total count available yet status = fmt.Sprintf("[%s] %v files, %s, %d errors", - formatDuration(time.Since(b.start)), + formatDuration(time.Since(start)), processed.Files, formatBytes(processed.Bytes), errors, ) } else { @@ -170,7 +49,7 @@ func (b *Backup) update(total, processed counter, errors uint, currentFiles map[ // include totals status = fmt.Sprintf("[%s] %s%v files %s, total %v files %v, %d errors%s", - formatDuration(time.Since(b.start)), + formatDuration(time.Since(start)), percent, processed.Files, formatBytes(processed.Bytes), @@ -201,29 +80,9 @@ func (b *Backup) ScannerError(item string, fi os.FileInfo, err error) error { // Error is the error callback function for the archiver, it prints the error and returns nil. func (b *Backup) Error(item string, fi os.FileInfo, err error) error { b.E("error: %v\n", err) - select { - case b.errCh <- struct{}{}: - case <-b.closed: - } return nil } -// StartFile is called when a file is being processed by a worker. -func (b *Backup) StartFile(filename string) { - select { - case b.workerCh <- fileWorkerMessage{filename: filename}: - case <-b.closed: - } -} - -// CompleteBlob is called for all saved blobs for files. -func (b *Backup) CompleteBlob(filename string, bytes uint64) { - select { - case b.processedCh <- counter{Bytes: bytes}: - case <-b.closed: - } -} - func formatPercent(numerator uint64, denominator uint64) string { if denominator == 0 { return "" @@ -273,138 +132,60 @@ func formatBytes(c uint64) string { // CompleteItem is the status callback function for the archiver when a // file/dir has been saved successfully. -func (b *Backup) CompleteItem(item string, previous, current *restic.Node, s archiver.ItemStats, d time.Duration) { - b.summary.Lock() - b.summary.ItemStats.Add(s) - - // for the last item "/", current is nil - if current != nil { - b.summary.ProcessedBytes += current.Size - } - - b.summary.Unlock() - - if current == nil { - // error occurred, tell the status display to remove the line - select { - case b.workerCh <- fileWorkerMessage{filename: item, done: true}: - case <-b.closed: - } - return - } - - switch current.Type { - case "file": - select { - case b.processedCh <- counter{Files: 1}: - case <-b.closed: - } - select { - case b.workerCh <- fileWorkerMessage{filename: item, done: true}: - case <-b.closed: - } - case "dir": - select { - case b.processedCh <- counter{Dirs: 1}: - case <-b.closed: - } - } - - if current.Type == "dir" { - if previous == nil { - b.VV("new %v, saved in %.3fs (%v added, %v metadata)", item, d.Seconds(), formatBytes(s.DataSize), formatBytes(s.TreeSize)) - b.summary.Lock() - b.summary.Dirs.New++ - b.summary.Unlock() - return - } - - if previous.Equals(*current) { - b.VV("unchanged %v", item) - b.summary.Lock() - b.summary.Dirs.Unchanged++ - b.summary.Unlock() - } else { - b.VV("modified %v, saved in %.3fs (%v added, %v metadata)", item, d.Seconds(), formatBytes(s.DataSize), formatBytes(s.TreeSize)) - b.summary.Lock() - b.summary.Dirs.Changed++ - b.summary.Unlock() - } - - } else if current.Type == "file" { - select { - case b.workerCh <- fileWorkerMessage{done: true, filename: item}: - case <-b.closed: - } - - if previous == nil { - b.VV("new %v, saved in %.3fs (%v added)", item, d.Seconds(), formatBytes(s.DataSize)) - b.summary.Lock() - b.summary.Files.New++ - b.summary.Unlock() - return - } - - if previous.Equals(*current) { - b.VV("unchanged %v", item) - b.summary.Lock() - b.summary.Files.Unchanged++ - b.summary.Unlock() - } else { - b.VV("modified %v, saved in %.3fs (%v added)", item, d.Seconds(), formatBytes(s.DataSize)) - b.summary.Lock() - b.summary.Files.Changed++ - b.summary.Unlock() - } +func (b *Backup) CompleteItem(messageType, item string, previous, current *restic.Node, s archiver.ItemStats, d time.Duration) { + switch messageType { + case "dir new": + b.VV("new %v, saved in %.3fs (%v added, %v metadata)", item, d.Seconds(), formatBytes(s.DataSize), formatBytes(s.TreeSize)) + case "dir unchanged": + b.VV("unchanged %v", item) + case "dir modified": + b.VV("modified %v, saved in %.3fs (%v added, %v metadata)", item, d.Seconds(), formatBytes(s.DataSize), formatBytes(s.TreeSize)) + case "file new": + b.VV("new %v, saved in %.3fs (%v added)", item, d.Seconds(), formatBytes(s.DataSize)) + case "file unchanged": + b.VV("unchanged %v", item) + case "file modified": + b.VV("modified %v, saved in %.3fs (%v added)", item, d.Seconds(), formatBytes(s.DataSize)) } } // ReportTotal sets the total stats up to now -func (b *Backup) ReportTotal(item string, s archiver.ScanStats) { - select { - case b.totalCh <- counter{Files: s.Files, Dirs: s.Dirs, Bytes: s.Bytes}: - case <-b.closed: - } - +func (b *Backup) ReportTotal(item string, start time.Time, s archiver.ScanStats) { if item == "" { b.V("scan finished in %.3fs: %v files, %s", - time.Since(b.start).Seconds(), + time.Since(start).Seconds(), s.Files, formatBytes(s.Bytes), ) - close(b.totalCh) - return + } +} + +// Reset status +func (b *Backup) Reset() { + if b.term.CanUpdateStatus() { + b.term.SetStatus([]string{""}) } } // Finish prints the finishing messages. -func (b *Backup) Finish(snapshotID restic.ID) { - // wait for the status update goroutine to shut down - <-b.closed - +func (b *Backup) Finish(snapshotID restic.ID, start time.Time, summary *Summary) { b.P("\n") - b.P("Files: %5d new, %5d changed, %5d unmodified\n", b.summary.Files.New, b.summary.Files.Changed, b.summary.Files.Unchanged) - b.P("Dirs: %5d new, %5d changed, %5d unmodified\n", b.summary.Dirs.New, b.summary.Dirs.Changed, b.summary.Dirs.Unchanged) - b.V("Data Blobs: %5d new\n", b.summary.ItemStats.DataBlobs) - b.V("Tree Blobs: %5d new\n", b.summary.ItemStats.TreeBlobs) + b.P("Files: %5d new, %5d changed, %5d unmodified\n", summary.Files.New, summary.Files.Changed, summary.Files.Unchanged) + b.P("Dirs: %5d new, %5d changed, %5d unmodified\n", summary.Dirs.New, summary.Dirs.Changed, summary.Dirs.Unchanged) + b.V("Data Blobs: %5d new\n", summary.ItemStats.DataBlobs) + b.V("Tree Blobs: %5d new\n", summary.ItemStats.TreeBlobs) verb := "Added" if b.dry { verb = "Would add" } - b.P("%s to the repo: %-5s\n", verb, formatBytes(b.summary.ItemStats.DataSize+b.summary.ItemStats.TreeSize)) + b.P("%s to the repo: %-5s\n", verb, formatBytes(summary.ItemStats.DataSize+summary.ItemStats.TreeSize)) b.P("\n") b.P("processed %v files, %v in %s", - b.summary.Files.New+b.summary.Files.Changed+b.summary.Files.Unchanged, - formatBytes(b.summary.ProcessedBytes), - formatDuration(time.Since(b.start)), + summary.Files.New+summary.Files.Changed+summary.Files.Unchanged, + formatBytes(summary.ProcessedBytes), + formatDuration(time.Since(start)), ) } -// SetMinUpdatePause sets b.MinUpdatePause. It satisfies the -// ArchiveProgressReporter interface. -func (b *Backup) SetMinUpdatePause(d time.Duration) { - b.MinUpdatePause = d -} - func (b *Backup) SetDryRun() { b.dry = true } diff --git a/internal/ui/json/backup.go b/internal/ui/json/backup.go index 70bc23801..51f661030 100644 --- a/internal/ui/json/backup.go +++ b/internal/ui/json/backup.go @@ -2,11 +2,9 @@ package json import ( "bytes" - "context" "encoding/json" "os" "sort" - "sync" "time" "github.com/restic/restic/internal/archiver" @@ -15,46 +13,14 @@ import ( "github.com/restic/restic/internal/ui/termstatus" ) -type counter struct { - Files, Dirs, Bytes uint64 -} - -type fileWorkerMessage struct { - filename string - done bool -} - // Backup reports progress for the `backup` command in JSON. type Backup struct { *ui.Message *ui.StdioWrapper - MinUpdatePause time.Duration - - term *termstatus.Terminal - v uint - start time.Time - dry bool - - totalBytes uint64 - - totalCh chan counter - processedCh chan counter - errCh chan struct{} - workerCh chan fileWorkerMessage - finished chan struct{} - closed chan struct{} - - summary struct { - sync.Mutex - Files, Dirs struct { - New uint - Changed uint - Unchanged uint - } - ProcessedBytes uint64 - archiver.ItemStats - } + term *termstatus.Terminal + v uint + dry bool } // NewBackup returns a new backup progress reporter. @@ -64,17 +30,6 @@ func NewBackup(term *termstatus.Terminal, verbosity uint) *Backup { StdioWrapper: ui.NewStdioWrapper(term), term: term, v: verbosity, - start: time.Now(), - - // limit to 60fps by default - MinUpdatePause: time.Second / 60, - - totalCh: make(chan counter), - processedCh: make(chan counter), - errCh: make(chan struct{}), - workerCh: make(chan fileWorkerMessage), - finished: make(chan struct{}), - closed: make(chan struct{}), } } @@ -95,78 +50,11 @@ func (b *Backup) error(status interface{}) { b.term.Error(toJSONString(status)) } -// Run regularly updates the status lines. It should be called in a separate -// goroutine. -func (b *Backup) Run(ctx context.Context) error { - var ( - lastUpdate time.Time - total, processed counter - errors uint - started bool - currentFiles = make(map[string]struct{}) - secondsRemaining uint64 - ) - - t := time.NewTicker(time.Second) - defer t.Stop() - defer close(b.closed) - - for { - select { - case <-ctx.Done(): - return nil - case <-b.finished: - started = false - case t, ok := <-b.totalCh: - if ok { - total = t - started = true - } else { - // scan has finished - b.totalCh = nil - b.totalBytes = total.Bytes - } - case s := <-b.processedCh: - processed.Files += s.Files - processed.Dirs += s.Dirs - processed.Bytes += s.Bytes - started = true - case <-b.errCh: - errors++ - started = true - case m := <-b.workerCh: - if m.done { - delete(currentFiles, m.filename) - } else { - currentFiles[m.filename] = struct{}{} - } - case <-t.C: - if !started { - continue - } - - if b.totalCh == nil { - secs := float64(time.Since(b.start) / time.Second) - todo := float64(total.Bytes - processed.Bytes) - secondsRemaining = uint64(secs / float64(processed.Bytes) * todo) - } - } - - // limit update frequency - if time.Since(lastUpdate) < b.MinUpdatePause { - continue - } - lastUpdate = time.Now() - - b.update(total, processed, errors, currentFiles, secondsRemaining) - } -} - // update updates the status lines. -func (b *Backup) update(total, processed counter, errors uint, currentFiles map[string]struct{}, secs uint64) { +func (b *Backup) Update(total, processed ui.Counter, errors uint, currentFiles map[string]struct{}, start time.Time, secs uint64) { status := statusUpdate{ MessageType: "status", - SecondsElapsed: uint64(time.Since(b.start) / time.Second), + SecondsElapsed: uint64(time.Since(start) / time.Second), SecondsRemaining: secs, TotalFiles: total.Files, FilesDone: processed.Files, @@ -207,211 +95,92 @@ func (b *Backup) Error(item string, fi os.FileInfo, err error) error { During: "archival", Item: item, }) - select { - case b.errCh <- struct{}{}: - case <-b.closed: - } return nil } -// StartFile is called when a file is being processed by a worker. -func (b *Backup) StartFile(filename string) { - select { - case b.workerCh <- fileWorkerMessage{filename: filename}: - case <-b.closed: - } -} - -// CompleteBlob is called for all saved blobs for files. -func (b *Backup) CompleteBlob(filename string, bytes uint64) { - select { - case b.processedCh <- counter{Bytes: bytes}: - case <-b.closed: - } -} - // CompleteItem is the status callback function for the archiver when a // file/dir has been saved successfully. -func (b *Backup) CompleteItem(item string, previous, current *restic.Node, s archiver.ItemStats, d time.Duration) { - b.summary.Lock() - b.summary.ItemStats.Add(s) - b.summary.Unlock() - - if current == nil { - // error occurred, tell the status display to remove the line - select { - case b.workerCh <- fileWorkerMessage{filename: item, done: true}: - case <-b.closed: - } +func (b *Backup) CompleteItem(messageType, item string, previous, current *restic.Node, s archiver.ItemStats, d time.Duration) { + if b.v < 2 { return } - b.summary.ProcessedBytes += current.Size - - switch current.Type { - case "file": - select { - case b.processedCh <- counter{Files: 1}: - case <-b.closed: - } - select { - case b.workerCh <- fileWorkerMessage{filename: item, done: true}: - case <-b.closed: - } - case "dir": - select { - case b.processedCh <- counter{Dirs: 1}: - case <-b.closed: - } - } - - if current.Type == "dir" { - if previous == nil { - if b.v >= 3 { - b.print(verboseUpdate{ - MessageType: "verbose_status", - Action: "new", - Item: item, - Duration: d.Seconds(), - DataSize: s.DataSize, - MetadataSize: s.TreeSize, - }) - } - b.summary.Lock() - b.summary.Dirs.New++ - b.summary.Unlock() - return - } - - if previous.Equals(*current) { - if b.v >= 3 { - b.print(verboseUpdate{ - MessageType: "verbose_status", - Action: "unchanged", - Item: item, - }) - } - b.summary.Lock() - b.summary.Dirs.Unchanged++ - b.summary.Unlock() - } else { - if b.v >= 3 { - b.print(verboseUpdate{ - MessageType: "verbose_status", - Action: "modified", - Item: item, - Duration: d.Seconds(), - DataSize: s.DataSize, - MetadataSize: s.TreeSize, - }) - } - b.summary.Lock() - b.summary.Dirs.Changed++ - b.summary.Unlock() - } - - } else if current.Type == "file" { - select { - case b.workerCh <- fileWorkerMessage{done: true, filename: item}: - case <-b.closed: - } - - if previous == nil { - if b.v >= 3 { - b.print(verboseUpdate{ - MessageType: "verbose_status", - Action: "new", - Item: item, - Duration: d.Seconds(), - DataSize: s.DataSize, - }) - } - b.summary.Lock() - b.summary.Files.New++ - b.summary.Unlock() - return - } - - if previous.Equals(*current) { - if b.v >= 3 { - b.print(verboseUpdate{ - MessageType: "verbose_status", - Action: "unchanged", - Item: item, - }) - } - b.summary.Lock() - b.summary.Files.Unchanged++ - b.summary.Unlock() - } else { - if b.v >= 3 { - b.print(verboseUpdate{ - MessageType: "verbose_status", - Action: "modified", - Item: item, - Duration: d.Seconds(), - DataSize: s.DataSize, - }) - } - b.summary.Lock() - b.summary.Files.Changed++ - b.summary.Unlock() - } + switch messageType { + case "dir new": + b.print(verboseUpdate{ + MessageType: "verbose_status", + Action: "new", + Item: item, + Duration: d.Seconds(), + DataSize: s.DataSize, + MetadataSize: s.TreeSize, + }) + case "dir unchanged": + b.print(verboseUpdate{ + MessageType: "verbose_status", + Action: "unchanged", + Item: item, + }) + case "dir modified": + b.print(verboseUpdate{ + MessageType: "verbose_status", + Action: "modified", + Item: item, + Duration: d.Seconds(), + DataSize: s.DataSize, + MetadataSize: s.TreeSize, + }) + case "file new": + b.print(verboseUpdate{ + MessageType: "verbose_status", + Action: "new", + Item: item, + Duration: d.Seconds(), + DataSize: s.DataSize, + }) + case "file unchanged": + b.print(verboseUpdate{ + MessageType: "verbose_status", + Action: "unchanged", + Item: item, + }) + case "file modified": + b.print(verboseUpdate{ + MessageType: "verbose_status", + Action: "modified", + Item: item, + Duration: d.Seconds(), + DataSize: s.DataSize, + }) } } // ReportTotal sets the total stats up to now -func (b *Backup) ReportTotal(item string, s archiver.ScanStats) { - select { - case b.totalCh <- counter{Files: uint64(s.Files), Dirs: uint64(s.Dirs), Bytes: s.Bytes}: - case <-b.closed: - } - +func (b *Backup) ReportTotal(item string, start time.Time, s archiver.ScanStats) { if item == "" { if b.v >= 2 { b.print(verboseUpdate{ MessageType: "status", Action: "scan_finished", - Duration: time.Since(b.start).Seconds(), + Duration: time.Since(start).Seconds(), DataSize: s.Bytes, TotalFiles: s.Files, }) } - close(b.totalCh) return } } // Finish prints the finishing messages. -func (b *Backup) Finish(snapshotID restic.ID) { - select { - case b.finished <- struct{}{}: - case <-b.closed: - } - +func (b *Backup) Finish(snapshotID restic.ID, start time.Time, summary *ui.Summary) { b.print(summaryOutput{ - MessageType: "summary", - FilesNew: b.summary.Files.New, - FilesChanged: b.summary.Files.Changed, - FilesUnmodified: b.summary.Files.Unchanged, - DirsNew: b.summary.Dirs.New, - DirsChanged: b.summary.Dirs.Changed, - DirsUnmodified: b.summary.Dirs.Unchanged, - DataBlobs: b.summary.ItemStats.DataBlobs, - TreeBlobs: b.summary.ItemStats.TreeBlobs, - DataAdded: b.summary.ItemStats.DataSize + b.summary.ItemStats.TreeSize, - TotalFilesProcessed: b.summary.Files.New + b.summary.Files.Changed + b.summary.Files.Unchanged, - TotalBytesProcessed: b.summary.ProcessedBytes, - TotalDuration: time.Since(b.start).Seconds(), - SnapshotID: snapshotID.Str(), - DryRun: b.dry, + MessageType: "summary", + SnapshotID: snapshotID.Str(), }) } -// SetMinUpdatePause sets b.MinUpdatePause. It satisfies the -// ArchiveProgressReporter interface. -func (b *Backup) SetMinUpdatePause(d time.Duration) { - b.MinUpdatePause = d +// Reset no-op +func (b *Backup) Reset() { } // SetDryRun marks the backup as a "dry run". diff --git a/internal/ui/progress.go b/internal/ui/progress.go new file mode 100644 index 000000000..5a9581419 --- /dev/null +++ b/internal/ui/progress.go @@ -0,0 +1,321 @@ +package ui + +import ( + "context" + "io" + "os" + "sync" + "time" + + "github.com/restic/restic/internal/archiver" + "github.com/restic/restic/internal/restic" + "github.com/restic/restic/internal/ui/signals" +) + +type ProgressPrinter interface { + Update(total, processed Counter, errors uint, currentFiles map[string]struct{}, start time.Time, secs uint64) + Error(item string, fi os.FileInfo, err error) error + ScannerError(item string, fi os.FileInfo, err error) error + CompleteItem(messageType string, item string, previous, current *restic.Node, s archiver.ItemStats, d time.Duration) + ReportTotal(item string, start time.Time, s archiver.ScanStats) + Finish(snapshotID restic.ID, start time.Time, summary *Summary) + Reset() + SetDryRun() + + // ui.StdioWrapper + Stdout() io.WriteCloser + Stderr() io.WriteCloser + + E(msg string, args ...interface{}) + P(msg string, args ...interface{}) + V(msg string, args ...interface{}) + VV(msg string, args ...interface{}) +} + +type Counter struct { + Files, Dirs, Bytes uint64 +} + +type fileWorkerMessage struct { + filename string + done bool +} + +type ProgressReporter interface { + CompleteItem(item string, previous, current *restic.Node, s archiver.ItemStats, d time.Duration) + StartFile(filename string) + CompleteBlob(filename string, bytes uint64) + ScannerError(item string, fi os.FileInfo, err error) error + ReportTotal(item string, s archiver.ScanStats) + SetMinUpdatePause(d time.Duration) + Run(ctx context.Context) error + Error(item string, fi os.FileInfo, err error) error + Finish(snapshotID restic.ID) +} + +type Summary struct { + sync.Mutex + Files, Dirs struct { + New uint + Changed uint + Unchanged uint + } + ProcessedBytes uint64 + TotalErrors uint + archiver.ItemStats +} + +type Progress struct { + MinUpdatePause time.Duration + + start time.Time + + totalBytes uint64 + + totalCh chan Counter + processedCh chan Counter + errCh chan struct{} + workerCh chan fileWorkerMessage + closed chan struct{} + + summary *Summary + printer ProgressPrinter +} + +func NewProgress(printer ProgressPrinter) *Progress { + return &Progress{ + // limit to 60fps by default + MinUpdatePause: time.Second / 60, + start: time.Now(), + + totalCh: make(chan Counter), + processedCh: make(chan Counter), + errCh: make(chan struct{}), + workerCh: make(chan fileWorkerMessage), + closed: make(chan struct{}), + + summary: &Summary{}, + + printer: printer, + } +} + +func (p *Progress) Run(ctx context.Context) error { + var ( + lastUpdate time.Time + total, processed Counter + errors uint + started bool + currentFiles = make(map[string]struct{}) + secondsRemaining uint64 + ) + + t := time.NewTicker(time.Second) + signalsCh := signals.GetProgressChannel() + defer t.Stop() + defer close(p.closed) + // Reset status when finished + defer p.printer.Reset() + + for { + forceUpdate := false + select { + case <-ctx.Done(): + return nil + case t, ok := <-p.totalCh: + if ok { + total = t + started = true + } else { + // scan has finished + p.totalCh = nil + p.totalBytes = total.Bytes + } + case s := <-p.processedCh: + processed.Files += s.Files + processed.Dirs += s.Dirs + processed.Bytes += s.Bytes + started = true + case <-p.errCh: + errors++ + p.summary.Lock() + p.summary.TotalErrors = errors + p.summary.Unlock() + started = true + case m := <-p.workerCh: + if m.done { + delete(currentFiles, m.filename) + } else { + currentFiles[m.filename] = struct{}{} + } + case <-t.C: + if !started { + continue + } + + if p.totalCh == nil { + secs := float64(time.Since(p.start) / time.Second) + todo := float64(total.Bytes - processed.Bytes) + secondsRemaining = uint64(secs / float64(processed.Bytes) * todo) + } + case <-signalsCh: + forceUpdate = true + } + + // limit update frequency + if !forceUpdate && (time.Since(lastUpdate) < p.MinUpdatePause || p.MinUpdatePause == 0) { + continue + } + lastUpdate = time.Now() + + p.printer.Update(total, processed, errors, currentFiles, p.start, secondsRemaining) + } +} + +// ScannerError is the error callback function for the scanner, it prints the +// error in verbose mode and returns nil. +func (p *Progress) ScannerError(item string, fi os.FileInfo, err error) error { + return p.printer.ScannerError(item, fi, err) +} + +// Error is the error callback function for the archiver, it prints the error and returns nil. +func (p *Progress) Error(item string, fi os.FileInfo, err error) error { + cbErr := p.printer.Error(item, fi, err) + + select { + case p.errCh <- struct{}{}: + case <-p.closed: + } + return cbErr +} + +// StartFile is called when a file is being processed by a worker. +func (p *Progress) StartFile(filename string) { + select { + case p.workerCh <- fileWorkerMessage{filename: filename}: + case <-p.closed: + } +} + +// CompleteBlob is called for all saved blobs for files. +func (p *Progress) CompleteBlob(filename string, bytes uint64) { + select { + case p.processedCh <- Counter{Bytes: bytes}: + case <-p.closed: + } +} + +// CompleteItem is the status callback function for the archiver when a +// file/dir has been saved successfully. +func (p *Progress) CompleteItem(item string, previous, current *restic.Node, s archiver.ItemStats, d time.Duration) { + p.summary.Lock() + p.summary.ItemStats.Add(s) + + // for the last item "/", current is nil + if current != nil { + p.summary.ProcessedBytes += current.Size + } + + p.summary.Unlock() + + if current == nil { + // error occurred, tell the status display to remove the line + select { + case p.workerCh <- fileWorkerMessage{filename: item, done: true}: + case <-p.closed: + } + return + } + + switch current.Type { + case "file": + select { + case p.processedCh <- Counter{Files: 1}: + case <-p.closed: + } + select { + case p.workerCh <- fileWorkerMessage{filename: item, done: true}: + case <-p.closed: + } + case "dir": + select { + case p.processedCh <- Counter{Dirs: 1}: + case <-p.closed: + } + } + + if current.Type == "dir" { + if previous == nil { + p.printer.CompleteItem("dir new", item, previous, current, s, d) + p.summary.Lock() + p.summary.Dirs.New++ + p.summary.Unlock() + return + } + + if previous.Equals(*current) { + p.printer.CompleteItem("dir unchanged", item, previous, current, s, d) + p.summary.Lock() + p.summary.Dirs.Unchanged++ + p.summary.Unlock() + } else { + p.printer.CompleteItem("dir modified", item, previous, current, s, d) + p.summary.Lock() + p.summary.Dirs.Changed++ + p.summary.Unlock() + } + + } else if current.Type == "file" { + select { + case p.workerCh <- fileWorkerMessage{done: true, filename: item}: + case <-p.closed: + } + + if previous == nil { + p.printer.CompleteItem("file new", item, previous, current, s, d) + p.summary.Lock() + p.summary.Files.New++ + p.summary.Unlock() + return + } + + if previous.Equals(*current) { + p.printer.CompleteItem("file unchanged", item, previous, current, s, d) + p.summary.Lock() + p.summary.Files.Unchanged++ + p.summary.Unlock() + } else { + p.printer.CompleteItem("file modified", item, previous, current, s, d) + p.summary.Lock() + p.summary.Files.Changed++ + p.summary.Unlock() + } + } +} + +// ReportTotal sets the total stats up to now +func (p *Progress) ReportTotal(item string, s archiver.ScanStats) { + select { + case p.totalCh <- Counter{Files: uint64(s.Files), Dirs: uint64(s.Dirs), Bytes: s.Bytes}: + case <-p.closed: + } + + if item == "" { + p.printer.ReportTotal(item, p.start, s) + close(p.totalCh) + return + } +} + +// Finish prints the finishing messages. +func (p *Progress) Finish(snapshotID restic.ID) { + <-p.closed + summary := p.summary + p.printer.Finish(snapshotID, p.start, summary) +} + +// SetMinUpdatePause sets b.MinUpdatePause. It satisfies the +// ArchiveProgressReporter interface. +func (p *Progress) SetMinUpdatePause(d time.Duration) { + p.MinUpdatePause = d +}