From 0e78ac92d875e85c43aba9b315baca4be93d1383 Mon Sep 17 00:00:00 2001 From: Alexander Neumann Date: Sun, 22 Apr 2018 11:57:20 +0200 Subject: [PATCH] Use new archiver code for backup --- cmd/restic/cmd_backup.go | 589 +++++++++++++---------------- cmd/restic/global.go | 6 +- cmd/restic/integration_test.go | 20 +- cmd/restic/main.go | 14 +- internal/archiver/archiver_test.go | 2 - internal/ui/backup.go | 343 +++++++++++++++++ internal/ui/message.go | 45 +++ internal/ui/stdio_wrapper.go | 86 +++++ internal/ui/stdio_wrapper_test.go | 95 +++++ 9 files changed, 863 insertions(+), 337 deletions(-) create mode 100644 internal/ui/backup.go create mode 100644 internal/ui/message.go create mode 100644 internal/ui/stdio_wrapper.go create mode 100644 internal/ui/stdio_wrapper_test.go diff --git a/cmd/restic/cmd_backup.go b/cmd/restic/cmd_backup.go index 5620e88e9..cf1bb3b1b 100644 --- a/cmd/restic/cmd_backup.go +++ b/cmd/restic/cmd_backup.go @@ -2,21 +2,24 @@ package main import ( "bufio" - "fmt" + "context" "io" "os" - "path" - "path/filepath" + "strconv" "strings" "time" "github.com/spf13/cobra" + tomb "gopkg.in/tomb.v2" "github.com/restic/restic/internal/archiver" "github.com/restic/restic/internal/debug" "github.com/restic/restic/internal/errors" "github.com/restic/restic/internal/fs" + "github.com/restic/restic/internal/repository" "github.com/restic/restic/internal/restic" + "github.com/restic/restic/internal/ui" + "github.com/restic/restic/internal/ui/termstatus" ) var cmdBackup = &cobra.Command{ @@ -42,11 +45,16 @@ given as the arguments. return errors.Fatal("cannot use both `--stdin` and `--files-from -`") } - if backupOptions.Stdin { - return readBackupFromStdin(backupOptions, globalOptions, args) - } + var t tomb.Tomb + term := termstatus.New(globalOptions.stdout, globalOptions.stderr) + t.Go(func() error { term.Run(t.Context(globalOptions.ctx)); return nil }) - return runBackup(backupOptions, globalOptions, args) + err := runBackup(backupOptions, globalOptions, term, args) + if err != nil { + return err + } + t.Kill(nil) + return t.Wait() }, } @@ -90,127 +98,6 @@ func init() { f.BoolVar(&backupOptions.WithAtime, "with-atime", false, "store the atime for all files and directories") } -func newScanProgress(gopts GlobalOptions) *restic.Progress { - if gopts.Quiet { - return nil - } - - p := restic.NewProgress() - p.OnUpdate = func(s restic.Stat, d time.Duration, ticker bool) { - if IsProcessBackground() { - return - } - - PrintProgress("[%s] %d directories, %d files, %s", formatDuration(d), s.Dirs, s.Files, formatBytes(s.Bytes)) - } - - p.OnDone = func(s restic.Stat, d time.Duration, ticker bool) { - PrintProgress("scanned %d directories, %d files in %s\n", s.Dirs, s.Files, formatDuration(d)) - } - - return p -} - -func newArchiveProgress(gopts GlobalOptions, todo restic.Stat) *restic.Progress { - if gopts.Quiet { - return nil - } - - archiveProgress := restic.NewProgress() - - var bps, eta uint64 - itemsTodo := todo.Files + todo.Dirs - - archiveProgress.OnUpdate = func(s restic.Stat, d time.Duration, ticker bool) { - if IsProcessBackground() { - return - } - - sec := uint64(d / time.Second) - if todo.Bytes > 0 && sec > 0 && ticker { - bps = s.Bytes / sec - if s.Bytes >= todo.Bytes { - eta = 0 - } else if bps > 0 { - eta = (todo.Bytes - s.Bytes) / bps - } - } - - itemsDone := s.Files + s.Dirs - - status1 := fmt.Sprintf("[%s] %s %s / %s %d / %d items %d errors ", - formatDuration(d), - formatPercent(s.Bytes, todo.Bytes), - formatBytes(s.Bytes), formatBytes(todo.Bytes), - itemsDone, itemsTodo, - s.Errors) - status2 := fmt.Sprintf("ETA %s ", formatSeconds(eta)) - - if w := stdoutTerminalWidth(); w > 0 { - maxlen := w - len(status2) - 1 - - if maxlen < 4 { - status1 = "" - } else if len(status1) > maxlen { - status1 = status1[:maxlen-4] - status1 += "... " - } - } - - PrintProgress("%s%s", status1, status2) - } - - archiveProgress.OnDone = func(s restic.Stat, d time.Duration, ticker bool) { - fmt.Printf("\nduration: %s\n", formatDuration(d)) - } - - return archiveProgress -} - -func newArchiveStdinProgress(gopts GlobalOptions) *restic.Progress { - if gopts.Quiet { - return nil - } - - archiveProgress := restic.NewProgress() - - var bps uint64 - - archiveProgress.OnUpdate = func(s restic.Stat, d time.Duration, ticker bool) { - if IsProcessBackground() { - return - } - - sec := uint64(d / time.Second) - if s.Bytes > 0 && sec > 0 && ticker { - bps = s.Bytes / sec - } - - status1 := fmt.Sprintf("[%s] %s %s/s", formatDuration(d), - formatBytes(s.Bytes), - formatBytes(bps)) - - if w := stdoutTerminalWidth(); w > 0 { - maxlen := w - len(status1) - - if maxlen < 4 { - status1 = "" - } else if len(status1) > maxlen { - status1 = status1[:maxlen-4] - status1 += "... " - } - } - - PrintProgress("%s", status1) - } - - archiveProgress.OnDone = func(s restic.Stat, d time.Duration, ticker bool) { - fmt.Printf("\nduration: %s\n", formatDuration(d)) - } - - return archiveProgress -} - // filterExisting returns a slice of all existing items, or an error if no // items exist at all. func filterExisting(items []string) (result []string, err error) { @@ -231,72 +118,10 @@ func filterExisting(items []string) (result []string, err error) { return } -func readBackupFromStdin(opts BackupOptions, gopts GlobalOptions, args []string) error { - if len(args) != 0 { - return errors.Fatal("when reading from stdin, no additional files can be specified") - } - - fn := opts.StdinFilename - - if fn == "" { - return errors.Fatal("filename for backup from stdin must not be empty") - } - - if filepath.Base(fn) != fn || path.Base(fn) != fn { - return errors.Fatal("filename is invalid (may not contain a directory, slash or backslash)") - } - - var t time.Time - if opts.TimeStamp != "" { - parsedT, err := time.Parse("2006-01-02 15:04:05", opts.TimeStamp) - if err != nil { - return err - } - t = parsedT - } else { - t = time.Now() - } - - if gopts.password == "" { - return errors.Fatal("unable to read password from stdin when data is to be read from stdin, use --password-file or $RESTIC_PASSWORD") - } - - repo, err := OpenRepository(gopts) - if err != nil { - return err - } - - lock, err := lockRepo(repo) - defer unlockRepo(lock) - if err != nil { - return err - } - - err = repo.LoadIndex(gopts.ctx) - if err != nil { - return err - } - - r := &archiver.Reader{ - Repository: repo, - Tags: opts.Tags, - Hostname: opts.Hostname, - TimeStamp: t, - } - - _, id, err := r.Archive(gopts.ctx, fn, os.Stdin, newArchiveStdinProgress(gopts)) - if err != nil { - return err - } - - Verbosef("archived as %v\n", id.Str()) - return nil -} - -// readFromFile will read all lines from the given filename and write them to a -// string array, if filename is empty readFromFile returns and empty string -// array. If filename is a dash (-), readFromFile will read the lines from -// the standard input. +// readFromFile will read all lines from the given filename and return them as +// a string array, if filename is empty readFromFile returns and empty string +// array. If filename is a dash (-), readFromFile will read the lines from the +// standard input. func readLinesFromFile(filename string) ([]string, error) { if filename == "" { return nil, nil @@ -335,47 +160,45 @@ func readLinesFromFile(filename string) ([]string, error) { return lines, nil } -func runBackup(opts BackupOptions, gopts GlobalOptions, args []string) error { +// Check returns an error when an invalid combination of options was set. +func (opts BackupOptions) Check(gopts GlobalOptions, args []string) error { if opts.FilesFrom == "-" && gopts.password == "" { return errors.Fatal("unable to read password from stdin when data is to be read from stdin, use --password-file or $RESTIC_PASSWORD") } - fromfile, err := readLinesFromFile(opts.FilesFrom) - if err != nil { - return err - } - - // merge files from files-from into normal args so we can reuse the normal - // args checks and have the ability to use both files-from and args at the - // same time - args = append(args, fromfile...) - if len(args) == 0 { - return errors.Fatal("nothing to backup, please specify target files/dirs") - } - - target := make([]string, 0, len(args)) - for _, d := range args { - if a, err := filepath.Abs(d); err == nil { - d = a + if opts.Stdin { + if opts.FilesFrom != "" { + return errors.Fatal("--stdin and --files-from cannot be used together") + } + + if len(args) > 0 { + return errors.Fatal("--stdin was specified and files/dirs were listed as arguments") } - target = append(target, d) } - target, err = filterExisting(target) - if err != nil { - return err - } - - // rejectFuncs collect functions that can reject items from the backup - var rejectFuncs []RejectFunc + return nil +} +// collectRejectFuncs returns a list of all functions which may reject data +// from being saved in a snapshot +func collectRejectFuncs(opts BackupOptions, repo *repository.Repository, targets []string) (fs []RejectFunc, err error) { // allowed devices if opts.ExcludeOtherFS { - f, err := rejectByDevice(target) + f, err := rejectByDevice(targets) if err != nil { - return err + return nil, err } - rejectFuncs = append(rejectFuncs, f) + fs = append(fs, f) + } + + // exclude restic cache + if repo.Cache != nil { + f, err := rejectResticCache(repo) + if err != nil { + return nil, err + } + + fs = append(fs, f) } // add patterns from file @@ -384,7 +207,7 @@ func runBackup(opts BackupOptions, gopts GlobalOptions, args []string) error { } if len(opts.Excludes) > 0 { - rejectFuncs = append(rejectFuncs, rejectByPattern(opts.Excludes)) + fs = append(fs, rejectByPattern(opts.Excludes)) } if opts.ExcludeCaches { @@ -394,111 +217,17 @@ func runBackup(opts BackupOptions, gopts GlobalOptions, args []string) error { for _, spec := range opts.ExcludeIfPresent { f, err := rejectIfPresent(spec) if err != nil { - return err + return nil, err } - rejectFuncs = append(rejectFuncs, f) + fs = append(fs, f) } - repo, err := OpenRepository(gopts) - if err != nil { - return err - } - - lock, err := lockRepo(repo) - defer unlockRepo(lock) - if err != nil { - return err - } - - // exclude restic cache - if repo.Cache != nil { - f, err := rejectResticCache(repo) - if err != nil { - return err - } - - rejectFuncs = append(rejectFuncs, f) - } - - err = repo.LoadIndex(gopts.ctx) - if err != nil { - return err - } - - var parentSnapshotID *restic.ID - - // Force using a parent - if !opts.Force && opts.Parent != "" { - id, err := restic.FindSnapshot(repo, opts.Parent) - if err != nil { - return errors.Fatalf("invalid id %q: %v", opts.Parent, err) - } - - parentSnapshotID = &id - } - - // Find last snapshot to set it as parent, if not already set - if !opts.Force && parentSnapshotID == nil { - id, err := restic.FindLatestSnapshot(gopts.ctx, repo, target, []restic.TagList{}, opts.Hostname) - if err == nil { - parentSnapshotID = &id - } else if err != restic.ErrNoSnapshotFound { - return err - } - } - - if parentSnapshotID != nil { - Verbosef("using parent snapshot %v\n", parentSnapshotID.Str()) - } - - Verbosef("scan %v\n", target) - - selectFilter := func(item string, fi os.FileInfo) bool { - for _, reject := range rejectFuncs { - if reject(item, fi) { - return false - } - } - return true - } - - var stat restic.Stat - if !gopts.Quiet { - stat, err = archiver.Scan(target, selectFilter, newScanProgress(gopts)) - if err != nil { - return err - } - } - - arch := archiver.New(repo) - arch.Excludes = opts.Excludes - arch.SelectFilter = selectFilter - arch.WithAccessTime = opts.WithAtime - - arch.Warn = func(dir string, fi os.FileInfo, err error) { - // TODO: make ignoring errors configurable - Warnf("%s\rwarning for %s: %v\n", ClearLine(), dir, err) - } - - timeStamp := time.Now() - if opts.TimeStamp != "" { - timeStamp, err = time.Parse(TimeFormat, opts.TimeStamp) - if err != nil { - return errors.Fatalf("error in time option: %v\n", err) - } - } - - _, id, err := arch.Snapshot(gopts.ctx, newArchiveProgress(gopts, stat), target, opts.Tags, opts.Hostname, parentSnapshotID, timeStamp) - if err != nil { - return err - } - - Verbosef("snapshot %s saved\n", id.Str()) - - return nil + return fs, nil } +// readExcludePatternsFromFiles reads all exclude files and returns the list of +// exclude patterns. func readExcludePatternsFromFiles(excludeFiles []string) []string { var excludes []string for _, filename := range excludeFiles { @@ -540,3 +269,217 @@ func readExcludePatternsFromFiles(excludeFiles []string) []string { } return excludes } + +// collectTargets returns a list of target files/dirs from several sources. +func collectTargets(opts BackupOptions, args []string) (targets []string, err error) { + if opts.Stdin { + return nil, nil + } + + fromfile, err := readLinesFromFile(opts.FilesFrom) + if err != nil { + return nil, err + } + + // merge files from files-from into normal args so we can reuse the normal + // args checks and have the ability to use both files-from and args at the + // same time + args = append(args, fromfile...) + if len(args) == 0 && !opts.Stdin { + return nil, errors.Fatal("nothing to backup, please specify target files/dirs") + } + + targets = args + targets, err = filterExisting(targets) + if err != nil { + return nil, err + } + + return targets, nil +} + +// parent returns the ID of the parent snapshot. If there is none, nil is +// returned. +func findParentSnapshot(ctx context.Context, repo restic.Repository, opts BackupOptions, targets []string) (parentID *restic.ID, err error) { + // Force using a parent + if !opts.Force && opts.Parent != "" { + id, err := restic.FindSnapshot(repo, opts.Parent) + if err != nil { + return nil, errors.Fatalf("invalid id %q: %v", opts.Parent, err) + } + + parentID = &id + } + + // Find last snapshot to set it as parent, if not already set + if !opts.Force && parentID == nil { + id, err := restic.FindLatestSnapshot(ctx, repo, targets, []restic.TagList{}, opts.Hostname) + if err == nil { + parentID = &id + } else if err != restic.ErrNoSnapshotFound { + return nil, err + } + } + + return parentID, nil +} + +func runBackup(opts BackupOptions, gopts GlobalOptions, term *termstatus.Terminal, args []string) error { + err := opts.Check(gopts, args) + if err != nil { + return err + } + + targets, err := collectTargets(opts, args) + if err != nil { + return err + } + + var t tomb.Tomb + + p := ui.NewBackup(term, gopts.verbosity) + + // use the terminal for stdout/stderr + prevStdout, prevStderr := gopts.stdout, gopts.stderr + defer func() { + gopts.stdout, gopts.stderr = prevStdout, prevStderr + }() + gopts.stdout, gopts.stderr = p.Stdout(), p.Stderr() + + if s, ok := os.LookupEnv("RESTIC_PROGRESS_FPS"); ok { + fps, err := strconv.Atoi(s) + if err == nil && fps >= 1 { + if fps > 60 { + fps = 60 + } + p.MinUpdatePause = time.Second / time.Duration(fps) + } + } + + t.Go(func() error { return p.Run(t.Context(gopts.ctx)) }) + + p.V("open repository") + repo, err := OpenRepository(gopts) + if err != nil { + return err + } + + p.V("lock repository") + lock, err := lockRepo(repo) + defer unlockRepo(lock) + if err != nil { + return err + } + + // rejectFuncs collect functions that can reject items from the backup + rejectFuncs, err := collectRejectFuncs(opts, repo, targets) + if err != nil { + return err + } + + p.V("load index files") + err = repo.LoadIndex(gopts.ctx) + if err != nil { + return err + } + + parentSnapshotID, err := findParentSnapshot(gopts.ctx, repo, opts, targets) + if err != nil { + return err + } + + if parentSnapshotID != nil { + p.V("using parent snapshot %v\n", parentSnapshotID.Str()) + } + + selectFilter := func(item string, fi os.FileInfo) bool { + for _, reject := range rejectFuncs { + if reject(item, fi) { + return false + } + } + return true + } + + timeStamp := time.Now() + if opts.TimeStamp != "" { + timeStamp, err = time.Parse(TimeFormat, opts.TimeStamp) + if err != nil { + return errors.Fatalf("error in time option: %v\n", err) + } + } + + var targetFS fs.FS = fs.Local{} + if opts.Stdin { + p.V("read data from stdin") + targetFS = &fs.Reader{ + ModTime: timeStamp, + Name: opts.StdinFilename, + Mode: 0644, + ReadCloser: os.Stdin, + } + targets = []string{opts.StdinFilename} + } + + sc := archiver.NewScanner(targetFS) + sc.Select = selectFilter + sc.Error = p.ScannerError + sc.Result = p.ReportTotal + + p.V("start scan") + t.Go(func() error { return sc.Scan(t.Context(gopts.ctx), targets) }) + + arch := archiver.New(repo, targetFS, archiver.Options{}) + arch.Select = selectFilter + arch.WithAtime = opts.WithAtime + arch.Error = p.Error + arch.CompleteItem = p.CompleteItemFn + arch.StartFile = p.StartFile + arch.CompleteBlob = p.CompleteBlob + + if parentSnapshotID == nil { + parentSnapshotID = &restic.ID{} + } + + snapshotOpts := archiver.SnapshotOptions{ + Excludes: opts.Excludes, + Tags: opts.Tags, + Time: timeStamp, + Hostname: opts.Hostname, + ParentSnapshot: *parentSnapshotID, + } + + uploader := archiver.IndexUploader{ + Repository: repo, + Start: func() { + p.VV("uploading intermediate index") + }, + Complete: func(id restic.ID) { + p.V("uploaded intermediate index %v", id.Str()) + }, + } + + t.Go(func() error { + return uploader.Upload(gopts.ctx, t.Context(gopts.ctx), 30*time.Second) + }) + + p.V("start backup") + _, id, err := arch.Snapshot(gopts.ctx, targets, snapshotOpts) + if err != nil { + return err + } + + p.Finish() + p.P("snapshot %s saved\n", id.Str()) + + // cleanly shutdown all running goroutines + t.Kill(nil) + + // let's see if one returned an error + err = t.Wait() + if err != nil { + return err + } + + return nil +} diff --git a/cmd/restic/global.go b/cmd/restic/global.go index 627e4c8f9..8c89a4d80 100644 --- a/cmd/restic/global.go +++ b/cmd/restic/global.go @@ -43,8 +43,7 @@ type GlobalOptions struct { Repo string PasswordFile string Quiet bool - Verbose bool - Debug bool + Verbose int NoLock bool JSON bool CacheDir string @@ -90,8 +89,7 @@ func init() { f.StringVarP(&globalOptions.Repo, "repo", "r", os.Getenv("RESTIC_REPOSITORY"), "repository to backup to or restore from (default: $RESTIC_REPOSITORY)") f.StringVarP(&globalOptions.PasswordFile, "password-file", "p", os.Getenv("RESTIC_PASSWORD_FILE"), "read the repository password from a file (default: $RESTIC_PASSWORD_FILE)") f.BoolVarP(&globalOptions.Quiet, "quiet", "q", false, "do not output comprehensive progress report") - f.BoolVarP(&globalOptions.Verbose, "verbose", "v", false, "be verbose") - f.BoolVar(&globalOptions.Debug, "debug", false, "be very verbose") + f.CountVarP(&globalOptions.Verbose, "verbose", "v", "be verbose (specify --verbose multiple times or level `n`)") f.BoolVar(&globalOptions.NoLock, "no-lock", false, "do not lock the repo, this allows some operations on read-only repos") f.BoolVarP(&globalOptions.JSON, "json", "", false, "set output mode to JSON for commands that support it") f.StringVar(&globalOptions.CacheDir, "cache-dir", "", "set the cache directory") diff --git a/cmd/restic/integration_test.go b/cmd/restic/integration_test.go index 88c154f5c..36a7670b1 100644 --- a/cmd/restic/integration_test.go +++ b/cmd/restic/integration_test.go @@ -3,6 +3,7 @@ package main import ( "bufio" "bytes" + "context" "crypto/rand" "encoding/json" "fmt" @@ -23,6 +24,8 @@ import ( "github.com/restic/restic/internal/repository" "github.com/restic/restic/internal/restic" rtest "github.com/restic/restic/internal/test" + "github.com/restic/restic/internal/ui/termstatus" + "golang.org/x/sync/errgroup" ) func parseIDsFromReader(t testing.TB, rd io.Reader) restic.IDs { @@ -52,13 +55,28 @@ func testRunInit(t testing.TB, opts GlobalOptions) { } func testRunBackup(t testing.TB, dir string, target []string, opts BackupOptions, gopts GlobalOptions) { + ctx, cancel := context.WithCancel(gopts.ctx) + defer cancel() + + var wg errgroup.Group + term := termstatus.New(gopts.stdout, gopts.stderr) + wg.Go(func() error { term.Run(ctx); return nil }) + gopts.stdout = ioutil.Discard t.Logf("backing up %v in %v", target, dir) if dir != "" { cleanup := fs.TestChdir(t, dir) defer cleanup() } - rtest.OK(t, runBackup(opts, gopts, target)) + + rtest.OK(t, runBackup(opts, gopts, term, target)) + + cancel() + + err := wg.Wait() + if err != nil { + t.Fatal(err) + } } func testRunList(t testing.TB, tpe string, opts GlobalOptions) restic.IDs { diff --git a/cmd/restic/main.go b/cmd/restic/main.go index c1a42bd90..01a902b1d 100644 --- a/cmd/restic/main.go +++ b/cmd/restic/main.go @@ -30,19 +30,19 @@ directories in an encrypted repository stored on different backends. DisableAutoGenTag: true, PersistentPreRunE: func(c *cobra.Command, args []string) error { - // set verbosity + // set verbosity, default is one globalOptions.verbosity = 1 - if globalOptions.Quiet && (globalOptions.Verbose || globalOptions.Debug) { - return errors.Fatal("--quiet and --verbose or --debug cannot be specified at the same time") + if globalOptions.Quiet && (globalOptions.Verbose > 1) { + return errors.Fatal("--quiet and --verbose cannot be specified at the same time") } switch { + case globalOptions.Verbose >= 2: + globalOptions.verbosity = 3 + case globalOptions.Verbose > 0: + globalOptions.verbosity = 2 case globalOptions.Quiet: globalOptions.verbosity = 0 - case globalOptions.Verbose: - globalOptions.verbosity = 2 - case globalOptions.Debug: - globalOptions.verbosity = 3 } // parse extended options diff --git a/internal/archiver/archiver_test.go b/internal/archiver/archiver_test.go index 3f4daf5ec..a8557ef2a 100644 --- a/internal/archiver/archiver_test.go +++ b/internal/archiver/archiver_test.go @@ -2,7 +2,6 @@ package archiver import ( "context" - "fmt" "io/ioutil" "os" "path/filepath" @@ -274,7 +273,6 @@ func (repo *blobCountingRepo) SaveTree(ctx context.Context, t *restic.Tree) (res repo.m.Lock() repo.saved[h]++ repo.m.Unlock() - fmt.Printf("savetree %v", h) return id, err } diff --git a/internal/ui/backup.go b/internal/ui/backup.go new file mode 100644 index 000000000..ebd56b8bc --- /dev/null +++ b/internal/ui/backup.go @@ -0,0 +1,343 @@ +package ui + +import ( + "context" + "fmt" + "os" + "sort" + "sync" + "time" + + "github.com/restic/restic/internal/archiver" + "github.com/restic/restic/internal/restic" + "github.com/restic/restic/internal/ui/termstatus" +) + +type counter struct { + Files, Dirs uint + Bytes uint64 +} + +type fileWorkerMessage struct { + filename string + done bool +} + +// Backup reports progress for the `backup` command. +type Backup struct { + *Message + *StdioWrapper + + MinUpdatePause time.Duration + + term *termstatus.Terminal + v uint + start time.Time + + totalBytes uint64 + + totalCh chan counter + processedCh chan counter + errCh chan struct{} + workerCh chan fileWorkerMessage + + summary struct { + sync.Mutex + Files, Dirs struct { + New uint + Changed uint + Unchanged uint + } + archiver.ItemStats + } +} + +// NewBackup returns a new backup progress reporter. +func NewBackup(term *termstatus.Terminal, verbosity uint) *Backup { + return &Backup{ + Message: NewMessage(term, verbosity), + StdioWrapper: NewStdioWrapper(term), + term: term, + v: verbosity, + start: time.Now(), + + // limit to 60fps by default + MinUpdatePause: time.Second / 60, + + totalCh: make(chan counter), + processedCh: make(chan counter), + errCh: make(chan struct{}), + workerCh: make(chan fileWorkerMessage), + } +} + +// Run regularly updates the status lines. It should be called in a separate +// goroutine. +func (b *Backup) Run(ctx context.Context) error { + var ( + lastUpdate time.Time + total, processed counter + errors uint + started bool + currentFiles = make(map[string]struct{}) + secondsRemaining uint64 + ) + + t := time.NewTicker(time.Second) + defer t.Stop() + + for { + select { + case <-ctx.Done(): + return nil + case t, ok := <-b.totalCh: + if ok { + total = t + started = true + } else { + // scan has finished + b.totalCh = nil + b.totalBytes = total.Bytes + } + case s := <-b.processedCh: + processed.Files += s.Files + processed.Dirs += s.Dirs + processed.Bytes += s.Bytes + started = true + case <-b.errCh: + errors++ + started = true + case m := <-b.workerCh: + if m.done { + delete(currentFiles, m.filename) + } else { + currentFiles[m.filename] = struct{}{} + } + case <-t.C: + if !started { + continue + } + + if b.totalCh == nil { + secs := float64(time.Since(b.start) / time.Second) + todo := float64(total.Bytes - processed.Bytes) + secondsRemaining = uint64(secs / float64(processed.Bytes) * todo) + } + } + + // limit update frequency + if time.Since(lastUpdate) < b.MinUpdatePause { + continue + } + lastUpdate = time.Now() + + b.update(total, processed, errors, currentFiles, secondsRemaining) + } +} + +// update updates the status lines. +func (b *Backup) update(total, processed counter, errors uint, currentFiles map[string]struct{}, secs uint64) { + var status string + if total.Files == 0 && total.Dirs == 0 { + // no total count available yet + status = fmt.Sprintf("[%s] %v files, %s, %d errors", + formatDuration(time.Since(b.start)), + processed.Files, formatBytes(processed.Bytes), errors, + ) + } else { + var eta string + + if secs > 0 { + eta = fmt.Sprintf(" ETA %s", formatSeconds(secs)) + } + + // include totals + status = fmt.Sprintf("[%s] %s %v files %s, total %v files %v, %d errors%s", + formatDuration(time.Since(b.start)), + formatPercent(processed.Bytes, total.Bytes), + processed.Files, + formatBytes(processed.Bytes), + total.Files, + formatBytes(total.Bytes), + errors, + eta, + ) + } + + lines := make([]string, 0, len(currentFiles)+1) + for filename := range currentFiles { + lines = append(lines, filename) + } + sort.Sort(sort.StringSlice(lines)) + lines = append([]string{status}, lines...) + + b.term.SetStatus(lines) +} + +// ScannerError is the error callback function for the scanner, it prints the +// error in verbose mode and returns nil. +func (b *Backup) ScannerError(item string, fi os.FileInfo, err error) error { + b.V("scan: %v\n", err) + return nil +} + +// Error is the error callback function for the archiver, it prints the error and returns nil. +func (b *Backup) Error(item string, fi os.FileInfo, err error) error { + b.E("error: %v\n", err) + b.errCh <- struct{}{} + return nil +} + +// StartFile is called when a file is being processed by a worker. +func (b *Backup) StartFile(filename string) { + b.workerCh <- fileWorkerMessage{ + filename: filename, + } +} + +// CompleteBlob is called for all saved blobs for files. +func (b *Backup) CompleteBlob(filename string, bytes uint64) { + b.processedCh <- counter{Bytes: bytes} +} + +func formatPercent(numerator uint64, denominator uint64) string { + if denominator == 0 { + return "" + } + + percent := 100.0 * float64(numerator) / float64(denominator) + + if percent > 100 { + percent = 100 + } + + return fmt.Sprintf("%3.2f%%", percent) +} + +func formatSeconds(sec uint64) string { + hours := sec / 3600 + sec -= hours * 3600 + min := sec / 60 + sec -= min * 60 + if hours > 0 { + return fmt.Sprintf("%d:%02d:%02d", hours, min, sec) + } + + return fmt.Sprintf("%d:%02d", min, sec) +} + +func formatDuration(d time.Duration) string { + sec := uint64(d / time.Second) + return formatSeconds(sec) +} + +func formatBytes(c uint64) string { + b := float64(c) + switch { + case c > 1<<40: + return fmt.Sprintf("%.3f TiB", b/(1<<40)) + case c > 1<<30: + return fmt.Sprintf("%.3f GiB", b/(1<<30)) + case c > 1<<20: + return fmt.Sprintf("%.3f MiB", b/(1<<20)) + case c > 1<<10: + return fmt.Sprintf("%.3f KiB", b/(1<<10)) + default: + return fmt.Sprintf("%d B", c) + } +} + +// CompleteItemFn is the status callback function for the archiver when a +// file/dir has been saved successfully. +func (b *Backup) CompleteItemFn(item string, previous, current *restic.Node, s archiver.ItemStats, d time.Duration) { + b.summary.Lock() + b.summary.ItemStats.Add(s) + b.summary.Unlock() + + if current == nil { + return + } + + switch current.Type { + case "file": + b.processedCh <- counter{Files: 1} + b.workerCh <- fileWorkerMessage{ + filename: item, + done: true, + } + case "dir": + b.processedCh <- counter{Dirs: 1} + } + + if current.Type == "dir" { + if previous == nil { + b.VV("new %v, saved in %.3fs (%v added, %v metadata)", item, d.Seconds(), formatBytes(s.DataSize), formatBytes(s.TreeSize)) + b.summary.Lock() + b.summary.Dirs.New++ + b.summary.Unlock() + return + } + + if previous.Equals(*current) { + b.VV("unchanged %v", item) + b.summary.Lock() + b.summary.Dirs.Unchanged++ + b.summary.Unlock() + } else { + b.VV("modified %v, saved in %.3fs (%v added, %v metadata)", item, d.Seconds(), formatBytes(s.DataSize), formatBytes(s.TreeSize)) + b.summary.Lock() + b.summary.Dirs.Changed++ + b.summary.Unlock() + } + + } else if current.Type == "file" { + + b.workerCh <- fileWorkerMessage{ + done: true, + filename: item, + } + + if previous == nil { + b.VV("new %v, saved in %.3fs (%v added)", item, d.Seconds(), formatBytes(s.DataSize)) + b.summary.Lock() + b.summary.Files.New++ + b.summary.Unlock() + return + } + + if previous.Equals(*current) { + b.VV("unchanged %v", item) + b.summary.Lock() + b.summary.Files.Unchanged++ + b.summary.Unlock() + } else { + b.VV("modified %v, saved in %.3fs (%v added)", item, d.Seconds(), formatBytes(s.DataSize)) + b.summary.Lock() + b.summary.Files.Changed++ + b.summary.Unlock() + } + } +} + +// ReportTotal sets the total stats up to now +func (b *Backup) ReportTotal(item string, s archiver.ScanStats) { + b.totalCh <- counter{Files: s.Files, Dirs: s.Dirs, Bytes: s.Bytes} + + if item == "" { + b.V("scan finished in %.3fs", time.Since(b.start).Seconds()) + close(b.totalCh) + return + } +} + +// Finish prints the finishing messages. +func (b *Backup) Finish() { + b.V("processed %s in %s", formatBytes(b.totalBytes), formatDuration(time.Since(b.start))) + b.V("\n") + b.V("Files: %5d new, %5d changed, %5d unmodified\n", b.summary.Files.New, b.summary.Files.Changed, b.summary.Files.Unchanged) + b.V("Dirs: %5d new, %5d changed, %5d unmodified\n", b.summary.Dirs.New, b.summary.Dirs.Changed, b.summary.Dirs.Unchanged) + b.VV("Data Blobs: %5d new\n", b.summary.ItemStats.DataBlobs) + b.VV("Tree Blobs: %5d new\n", b.summary.ItemStats.TreeBlobs) + b.V("Added: %-5s\n", formatBytes(b.summary.ItemStats.DataSize+b.summary.ItemStats.TreeSize)) + b.V("\n") +} diff --git a/internal/ui/message.go b/internal/ui/message.go new file mode 100644 index 000000000..75e54b019 --- /dev/null +++ b/internal/ui/message.go @@ -0,0 +1,45 @@ +package ui + +import "github.com/restic/restic/internal/ui/termstatus" + +// Message reports progress with messages of different verbosity. +type Message struct { + term *termstatus.Terminal + v uint +} + +// NewMessage returns a message progress reporter with underlying terminal +// term. +func NewMessage(term *termstatus.Terminal, verbosity uint) *Message { + return &Message{ + term: term, + v: verbosity, + } +} + +// E reports an error +func (m *Message) E(msg string, args ...interface{}) { + m.term.Errorf(msg, args...) +} + +// P prints a message if verbosity >= 1, this is used for normal messages which +// are not errors. +func (m *Message) P(msg string, args ...interface{}) { + if m.v >= 1 { + m.term.Printf(msg, args...) + } +} + +// V prints a message if verbosity >= 2, this is used for verbose messages. +func (m *Message) V(msg string, args ...interface{}) { + if m.v >= 2 { + m.term.Printf(msg, args...) + } +} + +// VV prints a message if verbosity >= 3, this is used for debug messages. +func (m *Message) VV(msg string, args ...interface{}) { + if m.v >= 3 { + m.term.Printf(msg, args...) + } +} diff --git a/internal/ui/stdio_wrapper.go b/internal/ui/stdio_wrapper.go new file mode 100644 index 000000000..eccaefb7b --- /dev/null +++ b/internal/ui/stdio_wrapper.go @@ -0,0 +1,86 @@ +package ui + +import ( + "bytes" + "io" + + "github.com/restic/restic/internal/ui/termstatus" +) + +// StdioWrapper provides stdout and stderr integration with termstatus. +type StdioWrapper struct { + stdout *lineWriter + stderr *lineWriter +} + +// NewStdioWrapper initializes a new stdio wrapper that can be used in place of +// os.Stdout or os.Stderr. +func NewStdioWrapper(term *termstatus.Terminal) *StdioWrapper { + return &StdioWrapper{ + stdout: newLineWriter(term.Print), + stderr: newLineWriter(term.Error), + } +} + +// Stdout returns a writer that is line buffered and can be used in place of +// os.Stdout. On Close(), the remaining bytes are written, followed by a line +// break. +func (w *StdioWrapper) Stdout() io.WriteCloser { + return w.stdout +} + +// Stderr returns a writer that is line buffered and can be used in place of +// os.Stderr. On Close(), the remaining bytes are written, followed by a line +// break. +func (w *StdioWrapper) Stderr() io.WriteCloser { + return w.stderr +} + +type lineWriter struct { + buf *bytes.Buffer + print func(string) +} + +var _ io.WriteCloser = &lineWriter{} + +func newLineWriter(print func(string)) *lineWriter { + return &lineWriter{buf: bytes.NewBuffer(nil), print: print} +} + +func (w *lineWriter) Write(data []byte) (n int, err error) { + n, err = w.buf.Write(data) + if err != nil { + return n, err + } + + // look for line breaks + buf := w.buf.Bytes() + skip := 0 + for i := 0; i < len(buf); { + if buf[i] == '\n' { + // found line + w.print(string(buf[:i+1])) + buf = buf[i+1:] + skip += i + 1 + i = 0 + continue + } + + i++ + } + + _ = w.buf.Next(skip) + + return n, err +} + +func (w *lineWriter) Flush() error { + if w.buf.Len() > 0 { + w.print(string(append(w.buf.Bytes(), '\n'))) + } + return nil +} + +func (w *lineWriter) Close() error { + return w.Flush() +} diff --git a/internal/ui/stdio_wrapper_test.go b/internal/ui/stdio_wrapper_test.go new file mode 100644 index 000000000..fc071f992 --- /dev/null +++ b/internal/ui/stdio_wrapper_test.go @@ -0,0 +1,95 @@ +package ui + +import ( + "testing" + + "github.com/google/go-cmp/cmp" +) + +func TestStdioWrapper(t *testing.T) { + var tests = []struct { + inputs [][]byte + outputs []string + }{ + { + inputs: [][]byte{ + []byte("foo"), + }, + outputs: []string{ + "foo\n", + }, + }, + { + inputs: [][]byte{ + []byte("foo"), + []byte("bar"), + []byte("\n"), + []byte("baz"), + }, + outputs: []string{ + "foobar\n", + "baz\n", + }, + }, + { + inputs: [][]byte{ + []byte("foo"), + []byte("bar\nbaz\n"), + []byte("bump\n"), + }, + outputs: []string{ + "foobar\n", + "baz\n", + "bump\n", + }, + }, + { + inputs: [][]byte{ + []byte("foo"), + []byte("bar\nbaz\n"), + []byte("bum"), + []byte("p\nx"), + []byte("x"), + []byte("x"), + []byte("z"), + }, + outputs: []string{ + "foobar\n", + "baz\n", + "bump\n", + "xxxz\n", + }, + }, + } + + for _, test := range tests { + t.Run("", func(t *testing.T) { + var lines []string + print := func(s string) { + lines = append(lines, s) + } + + w := newLineWriter(print) + + for _, data := range test.inputs { + n, err := w.Write(data) + if err != nil { + t.Fatal(err) + } + + if n != len(data) { + t.Errorf("invalid length returned by Write, want %d, got %d", len(data), n) + } + } + + err := w.Close() + if err != nil { + t.Fatal(err) + } + + if !cmp.Equal(test.outputs, lines) { + t.Error(cmp.Diff(test.outputs, lines)) + } + }) + } +}