From 4b70bba588304ee254a5c580443ada7dd277e1e1 Mon Sep 17 00:00:00 2001 From: Alexander Neumann Date: Sun, 4 Jan 2015 18:23:00 +0100 Subject: [PATCH] Split Scanner from Archiver, refactor Progress --- archiver.go | 175 +++------------------------------------ cmd/restic/cmd_backup.go | 119 +++++++++++--------------- progress.go | 165 ++++++++++++++++++++++++++++++++++++ scanner.go | 121 +++++++++++++++++++++++++++ tree.go | 1 + 5 files changed, 344 insertions(+), 237 deletions(-) create mode 100644 progress.go create mode 100644 scanner.go diff --git a/archiver.go b/archiver.go index 2c4360f27..af1e4a42d 100644 --- a/archiver.go +++ b/archiver.go @@ -4,9 +4,7 @@ import ( "fmt" "io" "os" - "path/filepath" "sync" - "time" "github.com/juju/arrar" "github.com/restic/restic/backend" @@ -16,8 +14,6 @@ import ( const ( maxConcurrentFiles = 8 maxConcurrentBlobs = 8 - - statTimeout = 20 * time.Millisecond ) type Archiver struct { @@ -29,36 +25,17 @@ type Archiver struct { fileToken chan struct{} blobToken chan struct{} - Stats Stats - Error func(dir string, fi os.FileInfo, err error) error Filter func(item string, fi os.FileInfo) bool - ScannerStats chan Stats - SaveStats chan Stats - - statsMutex sync.Mutex - updateStats Stats + p *Progress } -type Stats struct { - Files int - Directories int - Other int - Bytes uint64 -} - -func (s *Stats) Add(other Stats) { - s.Bytes += other.Bytes - s.Directories += other.Directories - s.Files += other.Files - s.Other += other.Other -} - -func NewArchiver(s Server) (*Archiver, error) { +func NewArchiver(s Server, p *Progress) (*Archiver, error) { var err error arch := &Archiver{ s: s, + p: p, fileToken: make(chan struct{}, maxConcurrentFiles), blobToken: make(chan struct{}, maxConcurrentBlobs), } @@ -92,34 +69,6 @@ func NewArchiver(s Server) (*Archiver, error) { return arch, nil } -func (arch *Archiver) update(ch chan Stats, stats Stats) { - if ch == nil { - return - } - - // load old stats from global state - arch.statsMutex.Lock() - stats.Add(arch.updateStats) - arch.updateStats = Stats{} - arch.statsMutex.Unlock() - - // try to send stats through the channel, with a timeout - timeout := time.After(statTimeout) - - select { - case ch <- stats: - break - case _ = <-timeout: - - // save cumulated stats to global state - arch.statsMutex.Lock() - arch.updateStats.Add(stats) - arch.statsMutex.Unlock() - - break - } -} - func (arch *Archiver) Save(t backend.Type, data []byte) (Blob, error) { blob, err := arch.ch.Save(t, data) if err != nil { @@ -179,7 +128,7 @@ func (arch *Archiver) SaveFile(node *Node) error { return arrar.Annotate(err, "SaveFile() save chunk") } - arch.update(arch.SaveStats, Stats{Bytes: blob.Size}) + arch.p.Report(Stat{Bytes: blob.Size}) blobs = Blobs{blob} } @@ -219,7 +168,7 @@ func (arch *Archiver) SaveFile(node *Node) error { FreeChunkBuf("blob chunker", buf) - arch.update(arch.SaveStats, Stats{Bytes: blob.Size}) + arch.p.Report(Stat{Bytes: blob.Size}) arch.blobToken <- token ch <- blob }(resCh) @@ -253,110 +202,6 @@ func (arch *Archiver) SaveFile(node *Node) error { return nil } -func (arch *Archiver) scan(dir string) (*Tree, error) { - var err error - - // open and list path - fd, err := os.Open(dir) - defer fd.Close() - if err != nil { - return nil, arch.Error(dir, nil, err) - } - - entries, err := fd.Readdir(-1) - if err != nil { - return nil, err - } - - // build new tree - tree := Tree{} - for _, entry := range entries { - path := filepath.Join(dir, entry.Name()) - - if !arch.Filter(path, entry) { - continue - } - - node, err := NodeFromFileInfo(path, entry) - if err != nil { - // TODO: error processing - return nil, err - } - - err = tree.Insert(node) - if err != nil { - return nil, err - } - - if entry.IsDir() { - node.Tree, err = arch.scan(path) - if err != nil { - return nil, err - } - } - } - - for _, node := range tree { - if node.Type == "file" && node.Content != nil { - continue - } - - switch node.Type { - case "file": - arch.Stats.Files++ - arch.Stats.Bytes += node.Size - case "dir": - arch.Stats.Directories++ - default: - arch.Stats.Other++ - } - } - - arch.update(arch.ScannerStats, arch.Stats) - - return &tree, nil -} - -func (arch *Archiver) Scan(path string) (*Tree, error) { - // reset global stats - arch.updateStats = Stats{} - - fi, err := os.Lstat(path) - if err != nil { - return nil, arrar.Annotatef(err, "Lstat(%q)", path) - } - - node, err := NodeFromFileInfo(path, fi) - if err != nil { - return nil, arrar.Annotate(err, "NodeFromFileInfo()") - } - - if node.Type != "dir" { - t := &Tree{node} - - // update stats - if node.Content == nil && node.Subtree == nil { - arch.Stats.Files = 1 - arch.Stats.Bytes = node.Size - } - - arch.update(arch.ScannerStats, arch.Stats) - - return t, nil - } - - arch.Stats.Directories = 1 - - node.Tree, err = arch.scan(path) - if err != nil { - return nil, arrar.Annotate(err, "loadTree()") - } - - arch.update(arch.ScannerStats, arch.Stats) - - return &Tree{node}, nil -} - func (arch *Archiver) saveTree(t *Tree) (Blob, error) { var wg sync.WaitGroup @@ -367,7 +212,7 @@ func (arch *Archiver) saveTree(t *Tree) (Blob, error) { return Blob{}, err } node.Subtree = b.ID - arch.update(arch.SaveStats, Stats{Directories: 1}) + arch.p.Report(Stat{Dirs: 1}) } else if node.Type == "file" && len(node.Content) == 0 { // get token token := <-arch.fileToken @@ -385,10 +230,10 @@ func (arch *Archiver) saveTree(t *Tree) (Blob, error) { if err != nil { panic(err) } - arch.update(arch.SaveStats, Stats{Files: 1}) + arch.p.Report(Stat{Files: 1}) }(node) } else { - arch.update(arch.SaveStats, Stats{Other: 1}) + arch.p.Report(Stat{Other: 1}) } } @@ -410,8 +255,8 @@ func (arch *Archiver) saveTree(t *Tree) (Blob, error) { } func (arch *Archiver) Snapshot(dir string, t *Tree, parentSnapshot backend.ID) (*Snapshot, backend.ID, error) { - // reset global stats - arch.updateStats = Stats{} + arch.p.Start() + defer arch.p.Done() sn, err := NewSnapshot(dir) if err != nil { diff --git a/cmd/restic/cmd_backup.go b/cmd/restic/cmd_backup.go index 52226c76e..ebcc5c0fe 100644 --- a/cmd/restic/cmd_backup.go +++ b/cmd/restic/cmd_backup.go @@ -40,7 +40,7 @@ func format_bytes(c uint64) string { } } -func format_duration(sec uint64) string { +func format_seconds(sec uint64) string { hours := sec / 3600 sec -= hours * 3600 min := sec / 60 @@ -52,6 +52,11 @@ func format_duration(sec uint64) string { return fmt.Sprintf("%d:%02d", min, sec) } +func format_duration(d time.Duration) string { + sec := uint64(d / time.Second) + return format_seconds(sec) +} + func print_tree2(indent int, t *restic.Tree) { for _, node := range *t { if node.Tree != nil { @@ -89,27 +94,16 @@ func (cmd CmdBackup) Execute(args []string) error { fmt.Printf("found parent snapshot %v\n", parentSnapshotID) } - arch, err := restic.NewArchiver(s) - if err != nil { - fmt.Fprintf(os.Stderr, "err: %v\n", err) - } - arch.Error = func(dir string, fi os.FileInfo, err error) error { - // TODO: make ignoring errors configurable - fmt.Fprintf(os.Stderr, "\nerror for %s: %v\n%v\n", dir, err, fi) - return nil - } - fmt.Printf("scanning %s\n", target) + scanProgress := restic.NewProgress(time.Second) if terminal.IsTerminal(int(os.Stdout.Fd())) { - ch := make(chan restic.Stats, 20) - arch.ScannerStats = ch - - go func(ch <-chan restic.Stats) { - for stats := range ch { - fmt.Printf("\r%6d directories, %6d files, %14s", stats.Directories, stats.Files, format_bytes(stats.Bytes)) - } - }(ch) + scanProgress.F = func(s restic.Stat, d time.Duration, ticker bool) { + fmt.Printf("\x1b[2K\r[%s] %d directories, %d files, %s", format_duration(d), s.Dirs, s.Files, format_bytes(s.Bytes)) + } + scanProgress.D = func(s restic.Stat, d time.Duration, ticker bool) { + fmt.Printf("\nDone in %s\n", format_duration(d)) + } } // TODO: add filter @@ -117,59 +111,51 @@ func (cmd CmdBackup) Execute(args []string) error { // return true // } - t, err := arch.Scan(target) + sc := restic.NewScanner(scanProgress) + + t, err := sc.Scan(target) if err != nil { fmt.Fprintf(os.Stderr, "error: %v\n", err) return err } - fmt.Printf("\r%6d directories, %6d files, %14s\n", arch.Stats.Directories, arch.Stats.Files, format_bytes(arch.Stats.Bytes)) + archiveProgress := restic.NewProgress(time.Second) + targetStat := scanProgress.Current() - stats := restic.Stats{} - start := time.Now() if terminal.IsTerminal(int(os.Stdout.Fd())) { - ch := make(chan restic.Stats, 20) - arch.SaveStats = ch - - ticker := time.NewTicker(time.Second) - var eta, bps uint64 - - go func(ch <-chan restic.Stats) { - - status := func(sec uint64) { - fmt.Printf("\x1b[2K\r[%s] %3.2f%% %s/s %s / %s ETA %s", - format_duration(sec), - float64(stats.Bytes)/float64(arch.Stats.Bytes)*100, - format_bytes(bps), - format_bytes(stats.Bytes), format_bytes(arch.Stats.Bytes), - format_duration(eta)) + var bps, eta uint64 + archiveProgress.F = func(s restic.Stat, d time.Duration, ticker bool) { + sec := uint64(d / time.Second) + if sec > 0 && ticker { + bps = s.Bytes / sec + eta = (targetStat.Bytes - s.Bytes) / bps } - defer ticker.Stop() - for { - select { - case s, ok := <-ch: - if !ok { - return - } - stats.Files += s.Files - stats.Directories += s.Directories - stats.Other += s.Other - stats.Bytes += s.Bytes + fmt.Printf("\x1b[2K\r[%s] %3.2f%% %s/s %s / %s ETA %s", + format_duration(d), + float64(s.Bytes)/float64(targetStat.Bytes)*100, + format_bytes(bps), + format_bytes(s.Bytes), format_bytes(targetStat.Bytes), + format_seconds(eta)) + } - status(uint64(time.Since(start) / time.Second)) - case <-ticker.C: - sec := uint64(time.Since(start) / time.Second) - bps = stats.Bytes / sec + archiveProgress.D = func(s restic.Stat, d time.Duration, ticker bool) { + sec := uint64(d / time.Second) + fmt.Printf("\nduration: %s, %.2fMiB/s\n", + format_duration(d), + float64(targetStat.Bytes)/float64(sec)/(1<<20)) + } + } - if bps > 0 { - eta = (arch.Stats.Bytes - stats.Bytes) / bps - } + arch, err := restic.NewArchiver(s, archiveProgress) + if err != nil { + fmt.Fprintf(os.Stderr, "err: %v\n", err) + } - status(sec) - } - } - }(ch) + arch.Error = func(dir string, fi os.FileInfo, err error) error { + // TODO: make ignoring errors configurable + fmt.Fprintf(os.Stderr, "\nerror for %s: %v\n%v\n", dir, err, fi) + return nil } _, id, err := arch.Snapshot(target, t, parentSnapshotID) @@ -177,23 +163,12 @@ func (cmd CmdBackup) Execute(args []string) error { fmt.Fprintf(os.Stderr, "error: %v\n", err) } - if terminal.IsTerminal(int(os.Stdout.Fd())) { - // close channels so that the goroutines terminate - close(arch.SaveStats) - close(arch.ScannerStats) - } - plen, err := s.PrefixLength(backend.Snapshot) if err != nil { return err } - fmt.Printf("\nsnapshot %s saved\n", id[:plen]) - - sec := uint64(time.Since(start) / time.Second) - fmt.Printf("duration: %s, %.2fMiB/s\n", - format_duration(sec), - float64(arch.Stats.Bytes)/float64(sec)/(1<<20)) + fmt.Printf("snapshot %s saved\n", id[:plen]) return nil } diff --git a/progress.go b/progress.go new file mode 100644 index 000000000..29db56624 --- /dev/null +++ b/progress.go @@ -0,0 +1,165 @@ +package restic + +import ( + "sync" + "time" +) + +type Progress struct { + F ProgressFunc + D ProgressFunc + fnM sync.Mutex + + cur Stat + curM sync.Mutex + start time.Time + c *time.Ticker + cancel chan struct{} + o sync.Once + d time.Duration + + running bool +} + +type Stat struct { + Files uint64 + Dirs uint64 + Other uint64 + Bytes uint64 +} + +type ProgressFunc func(s Stat, runtime time.Duration, ticker bool) + +// NewProgress returns a new progress reporter. After Start() has been called, +// the function fn is called when new data arrives or at least every d +// interval. The function doneFn is called when Done() is called. Both +// functions F and D are called synchronously and can use shared state. +func NewProgress(d time.Duration) *Progress { + return &Progress{d: d} +} + +// Start runs resets and runs the progress reporter. +func (p *Progress) Start() { + if p == nil { + return + } + + if p.running { + panic("truing to reset a running Progress") + } + + p.o = sync.Once{} + p.cancel = make(chan struct{}) + p.running = true + p.Reset() + p.start = time.Now() + p.c = time.NewTicker(p.d) + + go p.reporter() +} + +// Report adds the statistics from s to the current state and tries to report +// the accumulated statistics via the feedback channel. +func (p *Progress) Report(s Stat) { + if p == nil { + return + } + + if !p.running { + panic("reporting in a non-running Progress") + } + + p.curM.Lock() + p.cur.Add(s) + cur := p.cur + p.curM.Unlock() + + // update progress + if p.F != nil { + p.fnM.Lock() + p.F(cur, time.Since(p.start), false) + p.fnM.Unlock() + } +} + +func (p *Progress) reporter() { + if p == nil { + return + } + + for { + select { + case <-p.c.C: + p.curM.Lock() + cur := p.cur + p.curM.Unlock() + + if p.F != nil { + p.fnM.Lock() + p.F(cur, time.Since(p.start), true) + p.fnM.Unlock() + } + case <-p.cancel: + p.c.Stop() + return + } + } +} + +// Reset resets all statistic counters to zero. +func (p *Progress) Reset() { + if p == nil { + return + } + + if !p.running { + panic("resetting a non-running Progress") + } + + p.curM.Lock() + p.cur = Stat{} + p.curM.Unlock() +} + +// Done closes the progress report. +func (p *Progress) Done() { + if p == nil { + return + } + + if !p.running { + panic("Done() called on non-running Progress") + } + + if p.running { + p.running = false + p.o.Do(func() { + close(p.cancel) + }) + + cur := p.cur + + if p.D != nil { + p.fnM.Lock() + p.D(cur, time.Since(p.start), false) + p.fnM.Unlock() + } + } +} + +// Current returns the current stat value. +func (p *Progress) Current() Stat { + p.curM.Lock() + s := p.cur + p.curM.Unlock() + + return s +} + +// Add accumulates other into s. +func (s *Stat) Add(other Stat) { + s.Bytes += other.Bytes + s.Dirs += other.Dirs + s.Files += other.Files + s.Other += other.Other +} diff --git a/scanner.go b/scanner.go new file mode 100644 index 000000000..15d59dd06 --- /dev/null +++ b/scanner.go @@ -0,0 +1,121 @@ +package restic + +import ( + "os" + "path/filepath" + + "github.com/juju/arrar" +) + +type FilterFunc func(item string, fi os.FileInfo) bool +type ErrorFunc func(dir string, fi os.FileInfo, err error) error + +type Scanner struct { + Error ErrorFunc + Filter FilterFunc + + p *Progress +} + +func NewScanner(p *Progress) *Scanner { + sc := &Scanner{p: p} + + // abort on all errors + sc.Error = func(s string, fi os.FileInfo, err error) error { return err } + // allow all files + sc.Filter = func(string, os.FileInfo) bool { return true } + + return sc +} + +func scan(filterFn FilterFunc, progress *Progress, dir string) (*Tree, error) { + var err error + + // open and list path + fd, err := os.Open(dir) + defer fd.Close() + + if err != nil { + return nil, err + } + + entries, err := fd.Readdir(-1) + if err != nil { + return nil, err + } + + // build new tree + tree := Tree{} + for _, entry := range entries { + path := filepath.Join(dir, entry.Name()) + + if !filterFn(path, entry) { + continue + } + + node, err := NodeFromFileInfo(path, entry) + if err != nil { + // TODO: error processing + return nil, err + } + + err = tree.Insert(node) + if err != nil { + return nil, err + } + + if entry.IsDir() { + // save all errors in node.err, sort out later + node.Tree, node.err = scan(filterFn, progress, path) + } + } + + for _, node := range tree { + if node.Type == "file" && node.Content != nil { + continue + } + + switch node.Type { + case "file": + progress.Report(Stat{Files: 1, Bytes: node.Size}) + case "dir": + progress.Report(Stat{Dirs: 1}) + default: + progress.Report(Stat{Other: 1}) + } + } + + return &tree, nil +} + +func (sc *Scanner) Scan(path string) (*Tree, error) { + sc.p.Start() + defer sc.p.Done() + + fi, err := os.Lstat(path) + if err != nil { + return nil, arrar.Annotatef(err, "Lstat(%q)", path) + } + + node, err := NodeFromFileInfo(path, fi) + if err != nil { + return nil, arrar.Annotate(err, "NodeFromFileInfo()") + } + + if node.Type != "dir" { + t := &Tree{node} + + sc.p.Report(Stat{Files: 1, Bytes: node.Size}) + + return t, nil + } + + sc.p.Report(Stat{Dirs: 1}) + + node.Tree, err = scan(sc.Filter, sc.p, path) + if err != nil { + return nil, arrar.Annotate(err, "loadTree()") + } + + return &Tree{node}, nil +} diff --git a/tree.go b/tree.go index 306093621..87a1ce106 100644 --- a/tree.go +++ b/tree.go @@ -40,6 +40,7 @@ type Node struct { Tree *Tree `json:"-"` path string + err error } var (