2
2
mirror of https://github.com/octoleo/restic.git synced 2024-11-29 00:06:32 +00:00

Merge pull request #3264 from amozoss/upstream-master

Refactor backup progress
This commit is contained in:
MichaelEischer 2021-09-19 14:54:42 +02:00 committed by GitHub
commit 58efe21eca
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
8 changed files with 794 additions and 930 deletions

View File

@ -0,0 +1,8 @@
Bugfix: Don't print progress for `backup --json --quiet`
Unlike the text output, the json output format still printed progress
information even in quiet mode. This has been fixed by always disabling the
progress output in quiet mode.
https://github.com/restic/restic/issues/2738
https://github.com/restic/restic/pull/3264

View File

@ -24,8 +24,7 @@ import (
"github.com/restic/restic/internal/repository" "github.com/restic/restic/internal/repository"
"github.com/restic/restic/internal/restic" "github.com/restic/restic/internal/restic"
"github.com/restic/restic/internal/textfile" "github.com/restic/restic/internal/textfile"
"github.com/restic/restic/internal/ui" "github.com/restic/restic/internal/ui/backup"
"github.com/restic/restic/internal/ui/json"
"github.com/restic/restic/internal/ui/termstatus" "github.com/restic/restic/internal/ui/termstatus"
) )
@ -527,39 +526,17 @@ func runBackup(opts BackupOptions, gopts GlobalOptions, term *termstatus.Termina
return err return err
} }
type ArchiveProgressReporter interface { var progressPrinter backup.ProgressPrinter
CompleteItem(item string, previous, current *restic.Node, s archiver.ItemStats, d time.Duration)
StartFile(filename string)
CompleteBlob(filename string, bytes uint64)
ScannerError(item string, fi os.FileInfo, err error) error
ReportTotal(item string, s archiver.ScanStats)
SetMinUpdatePause(d time.Duration)
Run(ctx context.Context) error
Error(item string, fi os.FileInfo, err error) error
Finish(snapshotID restic.ID)
SetDryRun()
// ui.StdioWrapper
Stdout() io.WriteCloser
Stderr() io.WriteCloser
// ui.Message
E(msg string, args ...interface{})
P(msg string, args ...interface{})
V(msg string, args ...interface{})
VV(msg string, args ...interface{})
}
var p ArchiveProgressReporter
if gopts.JSON { if gopts.JSON {
p = json.NewBackup(term, gopts.verbosity) progressPrinter = backup.NewJSONProgress(term, gopts.verbosity)
} else { } else {
p = ui.NewBackup(term, gopts.verbosity) progressPrinter = backup.NewTextProgress(term, gopts.verbosity)
} }
progressReporter := backup.NewProgress(progressPrinter)
if opts.DryRun { if opts.DryRun {
repo.SetDryRun() repo.SetDryRun()
p.SetDryRun() progressReporter.SetDryRun()
} }
// use the terminal for stdout/stderr // use the terminal for stdout/stderr
@ -567,14 +544,14 @@ func runBackup(opts BackupOptions, gopts GlobalOptions, term *termstatus.Termina
defer func() { defer func() {
gopts.stdout, gopts.stderr = prevStdout, prevStderr gopts.stdout, gopts.stderr = prevStdout, prevStderr
}() }()
gopts.stdout, gopts.stderr = p.Stdout(), p.Stderr() gopts.stdout, gopts.stderr = progressPrinter.Stdout(), progressPrinter.Stderr()
p.SetMinUpdatePause(calculateProgressInterval(!gopts.Quiet)) progressReporter.SetMinUpdatePause(calculateProgressInterval(!gopts.Quiet, gopts.JSON))
t.Go(func() error { return p.Run(t.Context(gopts.ctx)) }) t.Go(func() error { return progressReporter.Run(t.Context(gopts.ctx)) })
if !gopts.JSON { if !gopts.JSON {
p.V("lock repository") progressPrinter.V("lock repository")
} }
lock, err := lockRepo(gopts.ctx, repo) lock, err := lockRepo(gopts.ctx, repo)
defer unlockRepo(lock) defer unlockRepo(lock)
@ -595,7 +572,7 @@ func runBackup(opts BackupOptions, gopts GlobalOptions, term *termstatus.Termina
} }
if !gopts.JSON { if !gopts.JSON {
p.V("load index files") progressPrinter.V("load index files")
} }
err = repo.LoadIndex(gopts.ctx) err = repo.LoadIndex(gopts.ctx)
if err != nil { if err != nil {
@ -609,9 +586,9 @@ func runBackup(opts BackupOptions, gopts GlobalOptions, term *termstatus.Termina
if !gopts.JSON { if !gopts.JSON {
if parentSnapshotID != nil { if parentSnapshotID != nil {
p.P("using parent snapshot %v\n", parentSnapshotID.Str()) progressPrinter.P("using parent snapshot %v\n", parentSnapshotID.Str())
} else { } else {
p.P("no parent snapshot found, will read all files\n") progressPrinter.P("no parent snapshot found, will read all files\n")
} }
} }
@ -640,12 +617,12 @@ func runBackup(opts BackupOptions, gopts GlobalOptions, term *termstatus.Termina
} }
errorHandler := func(item string, err error) error { errorHandler := func(item string, err error) error {
return p.Error(item, nil, err) return progressReporter.Error(item, nil, err)
} }
messageHandler := func(msg string, args ...interface{}) { messageHandler := func(msg string, args ...interface{}) {
if !gopts.JSON { if !gopts.JSON {
p.P(msg, args...) progressPrinter.P(msg, args...)
} }
} }
@ -655,7 +632,7 @@ func runBackup(opts BackupOptions, gopts GlobalOptions, term *termstatus.Termina
} }
if opts.Stdin { if opts.Stdin {
if !gopts.JSON { if !gopts.JSON {
p.V("read data from stdin") progressPrinter.V("read data from stdin")
} }
filename := path.Join("/", opts.StdinFilename) filename := path.Join("/", opts.StdinFilename)
targetFS = &fs.Reader{ targetFS = &fs.Reader{
@ -670,11 +647,11 @@ func runBackup(opts BackupOptions, gopts GlobalOptions, term *termstatus.Termina
sc := archiver.NewScanner(targetFS) sc := archiver.NewScanner(targetFS)
sc.SelectByName = selectByNameFilter sc.SelectByName = selectByNameFilter
sc.Select = selectFilter sc.Select = selectFilter
sc.Error = p.ScannerError sc.Error = progressReporter.ScannerError
sc.Result = p.ReportTotal sc.Result = progressReporter.ReportTotal
if !gopts.JSON { if !gopts.JSON {
p.V("start scan on %v", targets) progressPrinter.V("start scan on %v", targets)
} }
t.Go(func() error { return sc.Scan(t.Context(gopts.ctx), targets) }) t.Go(func() error { return sc.Scan(t.Context(gopts.ctx), targets) })
@ -685,11 +662,11 @@ func runBackup(opts BackupOptions, gopts GlobalOptions, term *termstatus.Termina
success := true success := true
arch.Error = func(item string, fi os.FileInfo, err error) error { arch.Error = func(item string, fi os.FileInfo, err error) error {
success = false success = false
return p.Error(item, fi, err) return progressReporter.Error(item, fi, err)
} }
arch.CompleteItem = p.CompleteItem arch.CompleteItem = progressReporter.CompleteItem
arch.StartFile = p.StartFile arch.StartFile = progressReporter.StartFile
arch.CompleteBlob = p.CompleteBlob arch.CompleteBlob = progressReporter.CompleteBlob
if opts.IgnoreInode { if opts.IgnoreInode {
// --ignore-inode implies --ignore-ctime: on FUSE, the ctime is not // --ignore-inode implies --ignore-ctime: on FUSE, the ctime is not
@ -713,7 +690,7 @@ func runBackup(opts BackupOptions, gopts GlobalOptions, term *termstatus.Termina
} }
if !gopts.JSON { if !gopts.JSON {
p.V("start backup on %v", targets) progressPrinter.V("start backup on %v", targets)
} }
_, id, err := arch.Snapshot(gopts.ctx, targets, snapshotOpts) _, id, err := arch.Snapshot(gopts.ctx, targets, snapshotOpts)
@ -729,9 +706,9 @@ func runBackup(opts BackupOptions, gopts GlobalOptions, term *termstatus.Termina
} }
// Report finished execution // Report finished execution
p.Finish(id) progressReporter.Finish(id)
if !gopts.JSON && !opts.DryRun { if !gopts.JSON && !opts.DryRun {
p.P("snapshot %s saved\n", id.Str()) progressPrinter.P("snapshot %s saved\n", id.Str())
} }
if !success { if !success {
return ErrInvalidSourceData return ErrInvalidSourceData

View File

@ -14,7 +14,7 @@ import (
// calculateProgressInterval returns the interval configured via RESTIC_PROGRESS_FPS // calculateProgressInterval returns the interval configured via RESTIC_PROGRESS_FPS
// or if unset returns an interval for 60fps on interactive terminals and 0 (=disabled) // or if unset returns an interval for 60fps on interactive terminals and 0 (=disabled)
// for non-interactive terminals or when run using the --quiet flag // for non-interactive terminals or when run using the --quiet flag
func calculateProgressInterval(show bool) time.Duration { func calculateProgressInterval(show bool, json bool) time.Duration {
interval := time.Second / 60 interval := time.Second / 60
fps, err := strconv.ParseFloat(os.Getenv("RESTIC_PROGRESS_FPS"), 64) fps, err := strconv.ParseFloat(os.Getenv("RESTIC_PROGRESS_FPS"), 64)
if err == nil && fps > 0 { if err == nil && fps > 0 {
@ -22,7 +22,7 @@ func calculateProgressInterval(show bool) time.Duration {
fps = 60 fps = 60
} }
interval = time.Duration(float64(time.Second) / fps) interval = time.Duration(float64(time.Second) / fps)
} else if !stdoutCanUpdateStatus() || !show { } else if !json && !stdoutCanUpdateStatus() || !show {
interval = 0 interval = 0
} }
return interval return interval
@ -33,7 +33,7 @@ func newProgressMax(show bool, max uint64, description string) *progress.Counter
if !show { if !show {
return nil return nil
} }
interval := calculateProgressInterval(show) interval := calculateProgressInterval(show, false)
canUpdateStatus := stdoutCanUpdateStatus() canUpdateStatus := stdoutCanUpdateStatus()
return progress.New(interval, max, func(v uint64, max uint64, d time.Duration, final bool) { return progress.New(interval, max, func(v uint64, max uint64, d time.Duration, final bool) {

View File

@ -1,410 +0,0 @@
package ui
import (
"context"
"fmt"
"os"
"sort"
"sync"
"time"
"github.com/restic/restic/internal/archiver"
"github.com/restic/restic/internal/restic"
"github.com/restic/restic/internal/ui/signals"
"github.com/restic/restic/internal/ui/termstatus"
)
type counter struct {
Files, Dirs uint
Bytes uint64
}
type fileWorkerMessage struct {
filename string
done bool
}
// Backup reports progress for the `backup` command.
type Backup struct {
*Message
*StdioWrapper
MinUpdatePause time.Duration
term *termstatus.Terminal
start time.Time
totalBytes uint64
dry bool // true if writes are faked
totalCh chan counter
processedCh chan counter
errCh chan struct{}
workerCh chan fileWorkerMessage
closed chan struct{}
summary struct {
sync.Mutex
Files, Dirs struct {
New uint
Changed uint
Unchanged uint
}
ProcessedBytes uint64
archiver.ItemStats
}
}
// NewBackup returns a new backup progress reporter.
func NewBackup(term *termstatus.Terminal, verbosity uint) *Backup {
return &Backup{
Message: NewMessage(term, verbosity),
StdioWrapper: NewStdioWrapper(term),
term: term,
start: time.Now(),
// limit to 60fps by default
MinUpdatePause: time.Second / 60,
totalCh: make(chan counter),
processedCh: make(chan counter),
errCh: make(chan struct{}),
workerCh: make(chan fileWorkerMessage),
closed: make(chan struct{}),
}
}
// Run regularly updates the status lines. It should be called in a separate
// goroutine.
func (b *Backup) Run(ctx context.Context) error {
var (
lastUpdate time.Time
total, processed counter
errors uint
started bool
currentFiles = make(map[string]struct{})
secondsRemaining uint64
)
t := time.NewTicker(time.Second)
signalsCh := signals.GetProgressChannel()
defer t.Stop()
defer close(b.closed)
// Reset status when finished
defer func() {
if b.term.CanUpdateStatus() {
b.term.SetStatus([]string{""})
}
}()
for {
forceUpdate := false
select {
case <-ctx.Done():
return nil
case t, ok := <-b.totalCh:
if ok {
total = t
started = true
} else {
// scan has finished
b.totalCh = nil
b.totalBytes = total.Bytes
}
case s := <-b.processedCh:
processed.Files += s.Files
processed.Dirs += s.Dirs
processed.Bytes += s.Bytes
started = true
case <-b.errCh:
errors++
started = true
case m := <-b.workerCh:
if m.done {
delete(currentFiles, m.filename)
} else {
currentFiles[m.filename] = struct{}{}
}
case <-t.C:
if !started {
continue
}
if b.totalCh == nil {
secs := float64(time.Since(b.start) / time.Second)
todo := float64(total.Bytes - processed.Bytes)
secondsRemaining = uint64(secs / float64(processed.Bytes) * todo)
}
case <-signalsCh:
forceUpdate = true
}
// limit update frequency
if !forceUpdate && (time.Since(lastUpdate) < b.MinUpdatePause || b.MinUpdatePause == 0) {
continue
}
lastUpdate = time.Now()
b.update(total, processed, errors, currentFiles, secondsRemaining)
}
}
// update updates the status lines.
func (b *Backup) update(total, processed counter, errors uint, currentFiles map[string]struct{}, secs uint64) {
var status string
if total.Files == 0 && total.Dirs == 0 {
// no total count available yet
status = fmt.Sprintf("[%s] %v files, %s, %d errors",
formatDuration(time.Since(b.start)),
processed.Files, formatBytes(processed.Bytes), errors,
)
} else {
var eta, percent string
if secs > 0 && processed.Bytes < total.Bytes {
eta = fmt.Sprintf(" ETA %s", formatSeconds(secs))
percent = formatPercent(processed.Bytes, total.Bytes)
percent += " "
}
// include totals
status = fmt.Sprintf("[%s] %s%v files %s, total %v files %v, %d errors%s",
formatDuration(time.Since(b.start)),
percent,
processed.Files,
formatBytes(processed.Bytes),
total.Files,
formatBytes(total.Bytes),
errors,
eta,
)
}
lines := make([]string, 0, len(currentFiles)+1)
for filename := range currentFiles {
lines = append(lines, filename)
}
sort.Strings(lines)
lines = append([]string{status}, lines...)
b.term.SetStatus(lines)
}
// ScannerError is the error callback function for the scanner, it prints the
// error in verbose mode and returns nil.
func (b *Backup) ScannerError(item string, fi os.FileInfo, err error) error {
b.V("scan: %v\n", err)
return nil
}
// Error is the error callback function for the archiver, it prints the error and returns nil.
func (b *Backup) Error(item string, fi os.FileInfo, err error) error {
b.E("error: %v\n", err)
select {
case b.errCh <- struct{}{}:
case <-b.closed:
}
return nil
}
// StartFile is called when a file is being processed by a worker.
func (b *Backup) StartFile(filename string) {
select {
case b.workerCh <- fileWorkerMessage{filename: filename}:
case <-b.closed:
}
}
// CompleteBlob is called for all saved blobs for files.
func (b *Backup) CompleteBlob(filename string, bytes uint64) {
select {
case b.processedCh <- counter{Bytes: bytes}:
case <-b.closed:
}
}
func formatPercent(numerator uint64, denominator uint64) string {
if denominator == 0 {
return ""
}
percent := 100.0 * float64(numerator) / float64(denominator)
if percent > 100 {
percent = 100
}
return fmt.Sprintf("%3.2f%%", percent)
}
func formatSeconds(sec uint64) string {
hours := sec / 3600
sec -= hours * 3600
min := sec / 60
sec -= min * 60
if hours > 0 {
return fmt.Sprintf("%d:%02d:%02d", hours, min, sec)
}
return fmt.Sprintf("%d:%02d", min, sec)
}
func formatDuration(d time.Duration) string {
sec := uint64(d / time.Second)
return formatSeconds(sec)
}
func formatBytes(c uint64) string {
b := float64(c)
switch {
case c > 1<<40:
return fmt.Sprintf("%.3f TiB", b/(1<<40))
case c > 1<<30:
return fmt.Sprintf("%.3f GiB", b/(1<<30))
case c > 1<<20:
return fmt.Sprintf("%.3f MiB", b/(1<<20))
case c > 1<<10:
return fmt.Sprintf("%.3f KiB", b/(1<<10))
default:
return fmt.Sprintf("%d B", c)
}
}
// CompleteItem is the status callback function for the archiver when a
// file/dir has been saved successfully.
func (b *Backup) CompleteItem(item string, previous, current *restic.Node, s archiver.ItemStats, d time.Duration) {
b.summary.Lock()
b.summary.ItemStats.Add(s)
// for the last item "/", current is nil
if current != nil {
b.summary.ProcessedBytes += current.Size
}
b.summary.Unlock()
if current == nil {
// error occurred, tell the status display to remove the line
select {
case b.workerCh <- fileWorkerMessage{filename: item, done: true}:
case <-b.closed:
}
return
}
switch current.Type {
case "file":
select {
case b.processedCh <- counter{Files: 1}:
case <-b.closed:
}
select {
case b.workerCh <- fileWorkerMessage{filename: item, done: true}:
case <-b.closed:
}
case "dir":
select {
case b.processedCh <- counter{Dirs: 1}:
case <-b.closed:
}
}
if current.Type == "dir" {
if previous == nil {
b.VV("new %v, saved in %.3fs (%v added, %v metadata)", item, d.Seconds(), formatBytes(s.DataSize), formatBytes(s.TreeSize))
b.summary.Lock()
b.summary.Dirs.New++
b.summary.Unlock()
return
}
if previous.Equals(*current) {
b.VV("unchanged %v", item)
b.summary.Lock()
b.summary.Dirs.Unchanged++
b.summary.Unlock()
} else {
b.VV("modified %v, saved in %.3fs (%v added, %v metadata)", item, d.Seconds(), formatBytes(s.DataSize), formatBytes(s.TreeSize))
b.summary.Lock()
b.summary.Dirs.Changed++
b.summary.Unlock()
}
} else if current.Type == "file" {
select {
case b.workerCh <- fileWorkerMessage{done: true, filename: item}:
case <-b.closed:
}
if previous == nil {
b.VV("new %v, saved in %.3fs (%v added)", item, d.Seconds(), formatBytes(s.DataSize))
b.summary.Lock()
b.summary.Files.New++
b.summary.Unlock()
return
}
if previous.Equals(*current) {
b.VV("unchanged %v", item)
b.summary.Lock()
b.summary.Files.Unchanged++
b.summary.Unlock()
} else {
b.VV("modified %v, saved in %.3fs (%v added)", item, d.Seconds(), formatBytes(s.DataSize))
b.summary.Lock()
b.summary.Files.Changed++
b.summary.Unlock()
}
}
}
// ReportTotal sets the total stats up to now
func (b *Backup) ReportTotal(item string, s archiver.ScanStats) {
select {
case b.totalCh <- counter{Files: s.Files, Dirs: s.Dirs, Bytes: s.Bytes}:
case <-b.closed:
}
if item == "" {
b.V("scan finished in %.3fs: %v files, %s",
time.Since(b.start).Seconds(),
s.Files, formatBytes(s.Bytes),
)
close(b.totalCh)
return
}
}
// Finish prints the finishing messages.
func (b *Backup) Finish(snapshotID restic.ID) {
// wait for the status update goroutine to shut down
<-b.closed
b.P("\n")
b.P("Files: %5d new, %5d changed, %5d unmodified\n", b.summary.Files.New, b.summary.Files.Changed, b.summary.Files.Unchanged)
b.P("Dirs: %5d new, %5d changed, %5d unmodified\n", b.summary.Dirs.New, b.summary.Dirs.Changed, b.summary.Dirs.Unchanged)
b.V("Data Blobs: %5d new\n", b.summary.ItemStats.DataBlobs)
b.V("Tree Blobs: %5d new\n", b.summary.ItemStats.TreeBlobs)
verb := "Added"
if b.dry {
verb = "Would add"
}
b.P("%s to the repo: %-5s\n", verb, formatBytes(b.summary.ItemStats.DataSize+b.summary.ItemStats.TreeSize))
b.P("\n")
b.P("processed %v files, %v in %s",
b.summary.Files.New+b.summary.Files.Changed+b.summary.Files.Unchanged,
formatBytes(b.summary.ProcessedBytes),
formatDuration(time.Since(b.start)),
)
}
// SetMinUpdatePause sets b.MinUpdatePause. It satisfies the
// ArchiveProgressReporter interface.
func (b *Backup) SetMinUpdatePause(d time.Duration) {
b.MinUpdatePause = d
}
func (b *Backup) SetDryRun() {
b.dry = true
}

244
internal/ui/backup/json.go Normal file
View File

@ -0,0 +1,244 @@
package backup
import (
"bytes"
"encoding/json"
"os"
"sort"
"time"
"github.com/restic/restic/internal/archiver"
"github.com/restic/restic/internal/restic"
"github.com/restic/restic/internal/ui"
"github.com/restic/restic/internal/ui/termstatus"
)
// JSONProgress reports progress for the `backup` command in JSON.
type JSONProgress struct {
*ui.Message
*ui.StdioWrapper
term *termstatus.Terminal
v uint
}
// assert that Backup implements the ProgressPrinter interface
var _ ProgressPrinter = &JSONProgress{}
// NewJSONProgress returns a new backup progress reporter.
func NewJSONProgress(term *termstatus.Terminal, verbosity uint) *JSONProgress {
return &JSONProgress{
Message: ui.NewMessage(term, verbosity),
StdioWrapper: ui.NewStdioWrapper(term),
term: term,
v: verbosity,
}
}
func toJSONString(status interface{}) string {
buf := new(bytes.Buffer)
err := json.NewEncoder(buf).Encode(status)
if err != nil {
panic(err)
}
return buf.String()
}
func (b *JSONProgress) print(status interface{}) {
b.term.Print(toJSONString(status))
}
func (b *JSONProgress) error(status interface{}) {
b.term.Error(toJSONString(status))
}
// Update updates the status lines.
func (b *JSONProgress) Update(total, processed Counter, errors uint, currentFiles map[string]struct{}, start time.Time, secs uint64) {
status := statusUpdate{
MessageType: "status",
SecondsElapsed: uint64(time.Since(start) / time.Second),
SecondsRemaining: secs,
TotalFiles: total.Files,
FilesDone: processed.Files,
TotalBytes: total.Bytes,
BytesDone: processed.Bytes,
ErrorCount: errors,
}
if total.Bytes > 0 {
status.PercentDone = float64(processed.Bytes) / float64(total.Bytes)
}
for filename := range currentFiles {
status.CurrentFiles = append(status.CurrentFiles, filename)
}
sort.Strings(status.CurrentFiles)
b.print(status)
}
// ScannerError is the error callback function for the scanner, it prints the
// error in verbose mode and returns nil.
func (b *JSONProgress) ScannerError(item string, fi os.FileInfo, err error) error {
b.error(errorUpdate{
MessageType: "error",
Error: err,
During: "scan",
Item: item,
})
return nil
}
// Error is the error callback function for the archiver, it prints the error and returns nil.
func (b *JSONProgress) Error(item string, fi os.FileInfo, err error) error {
b.error(errorUpdate{
MessageType: "error",
Error: err,
During: "archival",
Item: item,
})
return nil
}
// CompleteItem is the status callback function for the archiver when a
// file/dir has been saved successfully.
func (b *JSONProgress) CompleteItem(messageType, item string, previous, current *restic.Node, s archiver.ItemStats, d time.Duration) {
if b.v < 2 {
return
}
switch messageType {
case "dir new":
b.print(verboseUpdate{
MessageType: "verbose_status",
Action: "new",
Item: item,
Duration: d.Seconds(),
DataSize: s.DataSize,
MetadataSize: s.TreeSize,
})
case "dir unchanged":
b.print(verboseUpdate{
MessageType: "verbose_status",
Action: "unchanged",
Item: item,
})
case "dir modified":
b.print(verboseUpdate{
MessageType: "verbose_status",
Action: "modified",
Item: item,
Duration: d.Seconds(),
DataSize: s.DataSize,
MetadataSize: s.TreeSize,
})
case "file new":
b.print(verboseUpdate{
MessageType: "verbose_status",
Action: "new",
Item: item,
Duration: d.Seconds(),
DataSize: s.DataSize,
})
case "file unchanged":
b.print(verboseUpdate{
MessageType: "verbose_status",
Action: "unchanged",
Item: item,
})
case "file modified":
b.print(verboseUpdate{
MessageType: "verbose_status",
Action: "modified",
Item: item,
Duration: d.Seconds(),
DataSize: s.DataSize,
})
}
}
// ReportTotal sets the total stats up to now
func (b *JSONProgress) ReportTotal(item string, start time.Time, s archiver.ScanStats) {
if b.v >= 2 {
b.print(verboseUpdate{
MessageType: "status",
Action: "scan_finished",
Duration: time.Since(start).Seconds(),
DataSize: s.Bytes,
TotalFiles: s.Files,
})
}
}
// Finish prints the finishing messages.
func (b *JSONProgress) Finish(snapshotID restic.ID, start time.Time, summary *Summary, dryRun bool) {
b.print(summaryOutput{
MessageType: "summary",
FilesNew: summary.Files.New,
FilesChanged: summary.Files.Changed,
FilesUnmodified: summary.Files.Unchanged,
DirsNew: summary.Dirs.New,
DirsChanged: summary.Dirs.Changed,
DirsUnmodified: summary.Dirs.Unchanged,
DataBlobs: summary.ItemStats.DataBlobs,
TreeBlobs: summary.ItemStats.TreeBlobs,
DataAdded: summary.ItemStats.DataSize + summary.ItemStats.TreeSize,
TotalFilesProcessed: summary.Files.New + summary.Files.Changed + summary.Files.Unchanged,
TotalBytesProcessed: summary.ProcessedBytes,
TotalDuration: time.Since(start).Seconds(),
SnapshotID: snapshotID.Str(),
DryRun: dryRun,
})
}
// Reset no-op
func (b *JSONProgress) Reset() {
}
type statusUpdate struct {
MessageType string `json:"message_type"` // "status"
SecondsElapsed uint64 `json:"seconds_elapsed,omitempty"`
SecondsRemaining uint64 `json:"seconds_remaining,omitempty"`
PercentDone float64 `json:"percent_done"`
TotalFiles uint64 `json:"total_files,omitempty"`
FilesDone uint64 `json:"files_done,omitempty"`
TotalBytes uint64 `json:"total_bytes,omitempty"`
BytesDone uint64 `json:"bytes_done,omitempty"`
ErrorCount uint `json:"error_count,omitempty"`
CurrentFiles []string `json:"current_files,omitempty"`
}
type errorUpdate struct {
MessageType string `json:"message_type"` // "error"
Error error `json:"error"`
During string `json:"during"`
Item string `json:"item"`
}
type verboseUpdate struct {
MessageType string `json:"message_type"` // "verbose_status"
Action string `json:"action"`
Item string `json:"item"`
Duration float64 `json:"duration"` // in seconds
DataSize uint64 `json:"data_size"`
MetadataSize uint64 `json:"metadata_size"`
TotalFiles uint `json:"total_files"`
}
type summaryOutput struct {
MessageType string `json:"message_type"` // "summary"
FilesNew uint `json:"files_new"`
FilesChanged uint `json:"files_changed"`
FilesUnmodified uint `json:"files_unmodified"`
DirsNew uint `json:"dirs_new"`
DirsChanged uint `json:"dirs_changed"`
DirsUnmodified uint `json:"dirs_unmodified"`
DataBlobs int `json:"data_blobs"`
TreeBlobs int `json:"tree_blobs"`
DataAdded uint64 `json:"data_added"`
TotalFilesProcessed uint `json:"total_files_processed"`
TotalBytesProcessed uint64 `json:"total_bytes_processed"`
TotalDuration float64 `json:"total_duration"` // in seconds
SnapshotID string `json:"snapshot_id"`
DryRun bool `json:"dry_run,omitempty"`
}

View File

@ -0,0 +1,325 @@
package backup
import (
"context"
"io"
"os"
"sync"
"time"
"github.com/restic/restic/internal/archiver"
"github.com/restic/restic/internal/restic"
"github.com/restic/restic/internal/ui/signals"
)
type ProgressPrinter interface {
Update(total, processed Counter, errors uint, currentFiles map[string]struct{}, start time.Time, secs uint64)
Error(item string, fi os.FileInfo, err error) error
ScannerError(item string, fi os.FileInfo, err error) error
CompleteItem(messageType string, item string, previous, current *restic.Node, s archiver.ItemStats, d time.Duration)
ReportTotal(item string, start time.Time, s archiver.ScanStats)
Finish(snapshotID restic.ID, start time.Time, summary *Summary, dryRun bool)
Reset()
// ui.StdioWrapper
Stdout() io.WriteCloser
Stderr() io.WriteCloser
E(msg string, args ...interface{})
P(msg string, args ...interface{})
V(msg string, args ...interface{})
VV(msg string, args ...interface{})
}
type Counter struct {
Files, Dirs, Bytes uint64
}
type fileWorkerMessage struct {
filename string
done bool
}
type ProgressReporter interface {
CompleteItem(item string, previous, current *restic.Node, s archiver.ItemStats, d time.Duration)
StartFile(filename string)
CompleteBlob(filename string, bytes uint64)
ScannerError(item string, fi os.FileInfo, err error) error
ReportTotal(item string, s archiver.ScanStats)
SetMinUpdatePause(d time.Duration)
Run(ctx context.Context) error
Error(item string, fi os.FileInfo, err error) error
Finish(snapshotID restic.ID)
}
type Summary struct {
sync.Mutex
Files, Dirs struct {
New uint
Changed uint
Unchanged uint
}
ProcessedBytes uint64
archiver.ItemStats
}
// Progress reports progress for the `backup` command.
type Progress struct {
MinUpdatePause time.Duration
start time.Time
dry bool
totalBytes uint64
totalCh chan Counter
processedCh chan Counter
errCh chan struct{}
workerCh chan fileWorkerMessage
closed chan struct{}
summary *Summary
printer ProgressPrinter
}
func NewProgress(printer ProgressPrinter) *Progress {
return &Progress{
// limit to 60fps by default
MinUpdatePause: time.Second / 60,
start: time.Now(),
totalCh: make(chan Counter),
processedCh: make(chan Counter),
errCh: make(chan struct{}),
workerCh: make(chan fileWorkerMessage),
closed: make(chan struct{}),
summary: &Summary{},
printer: printer,
}
}
// Run regularly updates the status lines. It should be called in a separate
// goroutine.
func (p *Progress) Run(ctx context.Context) error {
var (
lastUpdate time.Time
total, processed Counter
errors uint
started bool
currentFiles = make(map[string]struct{})
secondsRemaining uint64
)
t := time.NewTicker(time.Second)
signalsCh := signals.GetProgressChannel()
defer t.Stop()
defer close(p.closed)
// Reset status when finished
defer p.printer.Reset()
for {
forceUpdate := false
select {
case <-ctx.Done():
return nil
case t, ok := <-p.totalCh:
if ok {
total = t
started = true
} else {
// scan has finished
p.totalCh = nil
p.totalBytes = total.Bytes
}
case s := <-p.processedCh:
processed.Files += s.Files
processed.Dirs += s.Dirs
processed.Bytes += s.Bytes
started = true
case <-p.errCh:
errors++
started = true
case m := <-p.workerCh:
if m.done {
delete(currentFiles, m.filename)
} else {
currentFiles[m.filename] = struct{}{}
}
case <-t.C:
if !started {
continue
}
if p.totalCh == nil {
secs := float64(time.Since(p.start) / time.Second)
todo := float64(total.Bytes - processed.Bytes)
secondsRemaining = uint64(secs / float64(processed.Bytes) * todo)
}
case <-signalsCh:
forceUpdate = true
}
// limit update frequency
if !forceUpdate && (time.Since(lastUpdate) < p.MinUpdatePause || p.MinUpdatePause == 0) {
continue
}
lastUpdate = time.Now()
p.printer.Update(total, processed, errors, currentFiles, p.start, secondsRemaining)
}
}
// ScannerError is the error callback function for the scanner, it prints the
// error in verbose mode and returns nil.
func (p *Progress) ScannerError(item string, fi os.FileInfo, err error) error {
return p.printer.ScannerError(item, fi, err)
}
// Error is the error callback function for the archiver, it prints the error and returns nil.
func (p *Progress) Error(item string, fi os.FileInfo, err error) error {
cbErr := p.printer.Error(item, fi, err)
select {
case p.errCh <- struct{}{}:
case <-p.closed:
}
return cbErr
}
// StartFile is called when a file is being processed by a worker.
func (p *Progress) StartFile(filename string) {
select {
case p.workerCh <- fileWorkerMessage{filename: filename}:
case <-p.closed:
}
}
// CompleteBlob is called for all saved blobs for files.
func (p *Progress) CompleteBlob(filename string, bytes uint64) {
select {
case p.processedCh <- Counter{Bytes: bytes}:
case <-p.closed:
}
}
// CompleteItem is the status callback function for the archiver when a
// file/dir has been saved successfully.
func (p *Progress) CompleteItem(item string, previous, current *restic.Node, s archiver.ItemStats, d time.Duration) {
p.summary.Lock()
p.summary.ItemStats.Add(s)
// for the last item "/", current is nil
if current != nil {
p.summary.ProcessedBytes += current.Size
}
p.summary.Unlock()
if current == nil {
// error occurred, tell the status display to remove the line
select {
case p.workerCh <- fileWorkerMessage{filename: item, done: true}:
case <-p.closed:
}
return
}
switch current.Type {
case "file":
select {
case p.processedCh <- Counter{Files: 1}:
case <-p.closed:
}
select {
case p.workerCh <- fileWorkerMessage{filename: item, done: true}:
case <-p.closed:
}
case "dir":
select {
case p.processedCh <- Counter{Dirs: 1}:
case <-p.closed:
}
}
if current.Type == "dir" {
if previous == nil {
p.printer.CompleteItem("dir new", item, previous, current, s, d)
p.summary.Lock()
p.summary.Dirs.New++
p.summary.Unlock()
return
}
if previous.Equals(*current) {
p.printer.CompleteItem("dir unchanged", item, previous, current, s, d)
p.summary.Lock()
p.summary.Dirs.Unchanged++
p.summary.Unlock()
} else {
p.printer.CompleteItem("dir modified", item, previous, current, s, d)
p.summary.Lock()
p.summary.Dirs.Changed++
p.summary.Unlock()
}
} else if current.Type == "file" {
select {
case p.workerCh <- fileWorkerMessage{done: true, filename: item}:
case <-p.closed:
}
if previous == nil {
p.printer.CompleteItem("file new", item, previous, current, s, d)
p.summary.Lock()
p.summary.Files.New++
p.summary.Unlock()
return
}
if previous.Equals(*current) {
p.printer.CompleteItem("file unchanged", item, previous, current, s, d)
p.summary.Lock()
p.summary.Files.Unchanged++
p.summary.Unlock()
} else {
p.printer.CompleteItem("file modified", item, previous, current, s, d)
p.summary.Lock()
p.summary.Files.Changed++
p.summary.Unlock()
}
}
}
// ReportTotal sets the total stats up to now
func (p *Progress) ReportTotal(item string, s archiver.ScanStats) {
select {
case p.totalCh <- Counter{Files: uint64(s.Files), Dirs: uint64(s.Dirs), Bytes: s.Bytes}:
case <-p.closed:
}
if item == "" {
p.printer.ReportTotal(item, p.start, s)
close(p.totalCh)
return
}
}
// Finish prints the finishing messages.
func (p *Progress) Finish(snapshotID restic.ID) {
// wait for the status update goroutine to shut down
<-p.closed
p.printer.Finish(snapshotID, p.start, p.summary, p.dry)
}
// SetMinUpdatePause sets b.MinUpdatePause. It satisfies the
// ArchiveProgressReporter interface.
func (p *Progress) SetMinUpdatePause(d time.Duration) {
p.MinUpdatePause = d
}
// SetDryRun marks the backup as a "dry run".
func (p *Progress) SetDryRun() {
p.dry = true
}

188
internal/ui/backup/text.go Normal file
View File

@ -0,0 +1,188 @@
package backup
import (
"fmt"
"os"
"sort"
"time"
"github.com/restic/restic/internal/archiver"
"github.com/restic/restic/internal/restic"
"github.com/restic/restic/internal/ui"
"github.com/restic/restic/internal/ui/termstatus"
)
// TextProgress reports progress for the `backup` command.
type TextProgress struct {
*ui.Message
*ui.StdioWrapper
term *termstatus.Terminal
}
// assert that Backup implements the ProgressPrinter interface
var _ ProgressPrinter = &TextProgress{}
// NewTextProgress returns a new backup progress reporter.
func NewTextProgress(term *termstatus.Terminal, verbosity uint) *TextProgress {
return &TextProgress{
Message: ui.NewMessage(term, verbosity),
StdioWrapper: ui.NewStdioWrapper(term),
term: term,
}
}
// Update updates the status lines.
func (b *TextProgress) Update(total, processed Counter, errors uint, currentFiles map[string]struct{}, start time.Time, secs uint64) {
var status string
if total.Files == 0 && total.Dirs == 0 {
// no total count available yet
status = fmt.Sprintf("[%s] %v files, %s, %d errors",
formatDuration(time.Since(start)),
processed.Files, formatBytes(processed.Bytes), errors,
)
} else {
var eta, percent string
if secs > 0 && processed.Bytes < total.Bytes {
eta = fmt.Sprintf(" ETA %s", formatSeconds(secs))
percent = formatPercent(processed.Bytes, total.Bytes)
percent += " "
}
// include totals
status = fmt.Sprintf("[%s] %s%v files %s, total %v files %v, %d errors%s",
formatDuration(time.Since(start)),
percent,
processed.Files,
formatBytes(processed.Bytes),
total.Files,
formatBytes(total.Bytes),
errors,
eta,
)
}
lines := make([]string, 0, len(currentFiles)+1)
for filename := range currentFiles {
lines = append(lines, filename)
}
sort.Strings(lines)
lines = append([]string{status}, lines...)
b.term.SetStatus(lines)
}
// ScannerError is the error callback function for the scanner, it prints the
// error in verbose mode and returns nil.
func (b *TextProgress) ScannerError(item string, fi os.FileInfo, err error) error {
b.V("scan: %v\n", err)
return nil
}
// Error is the error callback function for the archiver, it prints the error and returns nil.
func (b *TextProgress) Error(item string, fi os.FileInfo, err error) error {
b.E("error: %v\n", err)
return nil
}
func formatPercent(numerator uint64, denominator uint64) string {
if denominator == 0 {
return ""
}
percent := 100.0 * float64(numerator) / float64(denominator)
if percent > 100 {
percent = 100
}
return fmt.Sprintf("%3.2f%%", percent)
}
func formatSeconds(sec uint64) string {
hours := sec / 3600
sec -= hours * 3600
min := sec / 60
sec -= min * 60
if hours > 0 {
return fmt.Sprintf("%d:%02d:%02d", hours, min, sec)
}
return fmt.Sprintf("%d:%02d", min, sec)
}
func formatDuration(d time.Duration) string {
sec := uint64(d / time.Second)
return formatSeconds(sec)
}
func formatBytes(c uint64) string {
b := float64(c)
switch {
case c > 1<<40:
return fmt.Sprintf("%.3f TiB", b/(1<<40))
case c > 1<<30:
return fmt.Sprintf("%.3f GiB", b/(1<<30))
case c > 1<<20:
return fmt.Sprintf("%.3f MiB", b/(1<<20))
case c > 1<<10:
return fmt.Sprintf("%.3f KiB", b/(1<<10))
default:
return fmt.Sprintf("%d B", c)
}
}
// CompleteItem is the status callback function for the archiver when a
// file/dir has been saved successfully.
func (b *TextProgress) CompleteItem(messageType, item string, previous, current *restic.Node, s archiver.ItemStats, d time.Duration) {
switch messageType {
case "dir new":
b.VV("new %v, saved in %.3fs (%v added, %v metadata)", item, d.Seconds(), formatBytes(s.DataSize), formatBytes(s.TreeSize))
case "dir unchanged":
b.VV("unchanged %v", item)
case "dir modified":
b.VV("modified %v, saved in %.3fs (%v added, %v metadata)", item, d.Seconds(), formatBytes(s.DataSize), formatBytes(s.TreeSize))
case "file new":
b.VV("new %v, saved in %.3fs (%v added)", item, d.Seconds(), formatBytes(s.DataSize))
case "file unchanged":
b.VV("unchanged %v", item)
case "file modified":
b.VV("modified %v, saved in %.3fs (%v added)", item, d.Seconds(), formatBytes(s.DataSize))
}
}
// ReportTotal sets the total stats up to now
func (b *TextProgress) ReportTotal(item string, start time.Time, s archiver.ScanStats) {
b.V("scan finished in %.3fs: %v files, %s",
time.Since(start).Seconds(),
s.Files, formatBytes(s.Bytes),
)
}
// Reset status
func (b *TextProgress) Reset() {
if b.term.CanUpdateStatus() {
b.term.SetStatus([]string{""})
}
}
// Finish prints the finishing messages.
func (b *TextProgress) Finish(snapshotID restic.ID, start time.Time, summary *Summary, dryRun bool) {
b.P("\n")
b.P("Files: %5d new, %5d changed, %5d unmodified\n", summary.Files.New, summary.Files.Changed, summary.Files.Unchanged)
b.P("Dirs: %5d new, %5d changed, %5d unmodified\n", summary.Dirs.New, summary.Dirs.Changed, summary.Dirs.Unchanged)
b.V("Data Blobs: %5d new\n", summary.ItemStats.DataBlobs)
b.V("Tree Blobs: %5d new\n", summary.ItemStats.TreeBlobs)
verb := "Added"
if dryRun {
verb = "Would add"
}
b.P("%s to the repo: %-5s\n", verb, formatBytes(summary.ItemStats.DataSize+summary.ItemStats.TreeSize))
b.P("\n")
b.P("processed %v files, %v in %s",
summary.Files.New+summary.Files.Changed+summary.Files.Unchanged,
formatBytes(summary.ProcessedBytes),
formatDuration(time.Since(start)),
)
}

View File

@ -1,468 +0,0 @@
package json
import (
"bytes"
"context"
"encoding/json"
"os"
"sort"
"sync"
"time"
"github.com/restic/restic/internal/archiver"
"github.com/restic/restic/internal/restic"
"github.com/restic/restic/internal/ui"
"github.com/restic/restic/internal/ui/termstatus"
)
type counter struct {
Files, Dirs, Bytes uint64
}
type fileWorkerMessage struct {
filename string
done bool
}
// Backup reports progress for the `backup` command in JSON.
type Backup struct {
*ui.Message
*ui.StdioWrapper
MinUpdatePause time.Duration
term *termstatus.Terminal
v uint
start time.Time
dry bool
totalBytes uint64
totalCh chan counter
processedCh chan counter
errCh chan struct{}
workerCh chan fileWorkerMessage
finished chan struct{}
closed chan struct{}
summary struct {
sync.Mutex
Files, Dirs struct {
New uint
Changed uint
Unchanged uint
}
ProcessedBytes uint64
archiver.ItemStats
}
}
// NewBackup returns a new backup progress reporter.
func NewBackup(term *termstatus.Terminal, verbosity uint) *Backup {
return &Backup{
Message: ui.NewMessage(term, verbosity),
StdioWrapper: ui.NewStdioWrapper(term),
term: term,
v: verbosity,
start: time.Now(),
// limit to 60fps by default
MinUpdatePause: time.Second / 60,
totalCh: make(chan counter),
processedCh: make(chan counter),
errCh: make(chan struct{}),
workerCh: make(chan fileWorkerMessage),
finished: make(chan struct{}),
closed: make(chan struct{}),
}
}
func toJSONString(status interface{}) string {
buf := new(bytes.Buffer)
err := json.NewEncoder(buf).Encode(status)
if err != nil {
panic(err)
}
return buf.String()
}
func (b *Backup) print(status interface{}) {
b.term.Print(toJSONString(status))
}
func (b *Backup) error(status interface{}) {
b.term.Error(toJSONString(status))
}
// Run regularly updates the status lines. It should be called in a separate
// goroutine.
func (b *Backup) Run(ctx context.Context) error {
var (
lastUpdate time.Time
total, processed counter
errors uint
started bool
currentFiles = make(map[string]struct{})
secondsRemaining uint64
)
t := time.NewTicker(time.Second)
defer t.Stop()
defer close(b.closed)
for {
select {
case <-ctx.Done():
return nil
case <-b.finished:
started = false
case t, ok := <-b.totalCh:
if ok {
total = t
started = true
} else {
// scan has finished
b.totalCh = nil
b.totalBytes = total.Bytes
}
case s := <-b.processedCh:
processed.Files += s.Files
processed.Dirs += s.Dirs
processed.Bytes += s.Bytes
started = true
case <-b.errCh:
errors++
started = true
case m := <-b.workerCh:
if m.done {
delete(currentFiles, m.filename)
} else {
currentFiles[m.filename] = struct{}{}
}
case <-t.C:
if !started {
continue
}
if b.totalCh == nil {
secs := float64(time.Since(b.start) / time.Second)
todo := float64(total.Bytes - processed.Bytes)
secondsRemaining = uint64(secs / float64(processed.Bytes) * todo)
}
}
// limit update frequency
if time.Since(lastUpdate) < b.MinUpdatePause {
continue
}
lastUpdate = time.Now()
b.update(total, processed, errors, currentFiles, secondsRemaining)
}
}
// update updates the status lines.
func (b *Backup) update(total, processed counter, errors uint, currentFiles map[string]struct{}, secs uint64) {
status := statusUpdate{
MessageType: "status",
SecondsElapsed: uint64(time.Since(b.start) / time.Second),
SecondsRemaining: secs,
TotalFiles: total.Files,
FilesDone: processed.Files,
TotalBytes: total.Bytes,
BytesDone: processed.Bytes,
ErrorCount: errors,
}
if total.Bytes > 0 {
status.PercentDone = float64(processed.Bytes) / float64(total.Bytes)
}
for filename := range currentFiles {
status.CurrentFiles = append(status.CurrentFiles, filename)
}
sort.Strings(status.CurrentFiles)
b.print(status)
}
// ScannerError is the error callback function for the scanner, it prints the
// error in verbose mode and returns nil.
func (b *Backup) ScannerError(item string, fi os.FileInfo, err error) error {
b.error(errorUpdate{
MessageType: "error",
Error: err,
During: "scan",
Item: item,
})
return nil
}
// Error is the error callback function for the archiver, it prints the error and returns nil.
func (b *Backup) Error(item string, fi os.FileInfo, err error) error {
b.error(errorUpdate{
MessageType: "error",
Error: err,
During: "archival",
Item: item,
})
select {
case b.errCh <- struct{}{}:
case <-b.closed:
}
return nil
}
// StartFile is called when a file is being processed by a worker.
func (b *Backup) StartFile(filename string) {
select {
case b.workerCh <- fileWorkerMessage{filename: filename}:
case <-b.closed:
}
}
// CompleteBlob is called for all saved blobs for files.
func (b *Backup) CompleteBlob(filename string, bytes uint64) {
select {
case b.processedCh <- counter{Bytes: bytes}:
case <-b.closed:
}
}
// CompleteItem is the status callback function for the archiver when a
// file/dir has been saved successfully.
func (b *Backup) CompleteItem(item string, previous, current *restic.Node, s archiver.ItemStats, d time.Duration) {
b.summary.Lock()
b.summary.ItemStats.Add(s)
b.summary.Unlock()
if current == nil {
// error occurred, tell the status display to remove the line
select {
case b.workerCh <- fileWorkerMessage{filename: item, done: true}:
case <-b.closed:
}
return
}
b.summary.ProcessedBytes += current.Size
switch current.Type {
case "file":
select {
case b.processedCh <- counter{Files: 1}:
case <-b.closed:
}
select {
case b.workerCh <- fileWorkerMessage{filename: item, done: true}:
case <-b.closed:
}
case "dir":
select {
case b.processedCh <- counter{Dirs: 1}:
case <-b.closed:
}
}
if current.Type == "dir" {
if previous == nil {
if b.v >= 3 {
b.print(verboseUpdate{
MessageType: "verbose_status",
Action: "new",
Item: item,
Duration: d.Seconds(),
DataSize: s.DataSize,
MetadataSize: s.TreeSize,
})
}
b.summary.Lock()
b.summary.Dirs.New++
b.summary.Unlock()
return
}
if previous.Equals(*current) {
if b.v >= 3 {
b.print(verboseUpdate{
MessageType: "verbose_status",
Action: "unchanged",
Item: item,
})
}
b.summary.Lock()
b.summary.Dirs.Unchanged++
b.summary.Unlock()
} else {
if b.v >= 3 {
b.print(verboseUpdate{
MessageType: "verbose_status",
Action: "modified",
Item: item,
Duration: d.Seconds(),
DataSize: s.DataSize,
MetadataSize: s.TreeSize,
})
}
b.summary.Lock()
b.summary.Dirs.Changed++
b.summary.Unlock()
}
} else if current.Type == "file" {
select {
case b.workerCh <- fileWorkerMessage{done: true, filename: item}:
case <-b.closed:
}
if previous == nil {
if b.v >= 3 {
b.print(verboseUpdate{
MessageType: "verbose_status",
Action: "new",
Item: item,
Duration: d.Seconds(),
DataSize: s.DataSize,
})
}
b.summary.Lock()
b.summary.Files.New++
b.summary.Unlock()
return
}
if previous.Equals(*current) {
if b.v >= 3 {
b.print(verboseUpdate{
MessageType: "verbose_status",
Action: "unchanged",
Item: item,
})
}
b.summary.Lock()
b.summary.Files.Unchanged++
b.summary.Unlock()
} else {
if b.v >= 3 {
b.print(verboseUpdate{
MessageType: "verbose_status",
Action: "modified",
Item: item,
Duration: d.Seconds(),
DataSize: s.DataSize,
})
}
b.summary.Lock()
b.summary.Files.Changed++
b.summary.Unlock()
}
}
}
// ReportTotal sets the total stats up to now
func (b *Backup) ReportTotal(item string, s archiver.ScanStats) {
select {
case b.totalCh <- counter{Files: uint64(s.Files), Dirs: uint64(s.Dirs), Bytes: s.Bytes}:
case <-b.closed:
}
if item == "" {
if b.v >= 2 {
b.print(verboseUpdate{
MessageType: "status",
Action: "scan_finished",
Duration: time.Since(b.start).Seconds(),
DataSize: s.Bytes,
TotalFiles: s.Files,
})
}
close(b.totalCh)
return
}
}
// Finish prints the finishing messages.
func (b *Backup) Finish(snapshotID restic.ID) {
select {
case b.finished <- struct{}{}:
case <-b.closed:
}
b.print(summaryOutput{
MessageType: "summary",
FilesNew: b.summary.Files.New,
FilesChanged: b.summary.Files.Changed,
FilesUnmodified: b.summary.Files.Unchanged,
DirsNew: b.summary.Dirs.New,
DirsChanged: b.summary.Dirs.Changed,
DirsUnmodified: b.summary.Dirs.Unchanged,
DataBlobs: b.summary.ItemStats.DataBlobs,
TreeBlobs: b.summary.ItemStats.TreeBlobs,
DataAdded: b.summary.ItemStats.DataSize + b.summary.ItemStats.TreeSize,
TotalFilesProcessed: b.summary.Files.New + b.summary.Files.Changed + b.summary.Files.Unchanged,
TotalBytesProcessed: b.summary.ProcessedBytes,
TotalDuration: time.Since(b.start).Seconds(),
SnapshotID: snapshotID.Str(),
DryRun: b.dry,
})
}
// SetMinUpdatePause sets b.MinUpdatePause. It satisfies the
// ArchiveProgressReporter interface.
func (b *Backup) SetMinUpdatePause(d time.Duration) {
b.MinUpdatePause = d
}
// SetDryRun marks the backup as a "dry run".
func (b *Backup) SetDryRun() {
b.dry = true
}
type statusUpdate struct {
MessageType string `json:"message_type"` // "status"
SecondsElapsed uint64 `json:"seconds_elapsed,omitempty"`
SecondsRemaining uint64 `json:"seconds_remaining,omitempty"`
PercentDone float64 `json:"percent_done"`
TotalFiles uint64 `json:"total_files,omitempty"`
FilesDone uint64 `json:"files_done,omitempty"`
TotalBytes uint64 `json:"total_bytes,omitempty"`
BytesDone uint64 `json:"bytes_done,omitempty"`
ErrorCount uint `json:"error_count,omitempty"`
CurrentFiles []string `json:"current_files,omitempty"`
}
type errorUpdate struct {
MessageType string `json:"message_type"` // "error"
Error error `json:"error"`
During string `json:"during"`
Item string `json:"item"`
}
type verboseUpdate struct {
MessageType string `json:"message_type"` // "verbose_status"
Action string `json:"action"`
Item string `json:"item"`
Duration float64 `json:"duration"` // in seconds
DataSize uint64 `json:"data_size"`
MetadataSize uint64 `json:"metadata_size"`
TotalFiles uint `json:"total_files"`
}
type summaryOutput struct {
MessageType string `json:"message_type"` // "summary"
FilesNew uint `json:"files_new"`
FilesChanged uint `json:"files_changed"`
FilesUnmodified uint `json:"files_unmodified"`
DirsNew uint `json:"dirs_new"`
DirsChanged uint `json:"dirs_changed"`
DirsUnmodified uint `json:"dirs_unmodified"`
DataBlobs int `json:"data_blobs"`
TreeBlobs int `json:"tree_blobs"`
DataAdded uint64 `json:"data_added"`
TotalFilesProcessed uint `json:"total_files_processed"`
TotalBytesProcessed uint64 `json:"total_bytes_processed"`
TotalDuration float64 `json:"total_duration"` // in seconds
SnapshotID string `json:"snapshot_id"`
DryRun bool `json:"dry_run,omitempty"`
}