From a93bc3c9917a12a1051af83911a726c79f309b34 Mon Sep 17 00:00:00 2001 From: Alexander Neumann Date: Sun, 4 Jan 2015 16:10:30 +0100 Subject: [PATCH 1/6] Rename functions, remove code for compare * Archiver: Rename loadTree -> scan and LoadTree -> Scan * Archiver: Remove code to compare against an old snapshot/tree, the current implementation was really slow --- archiver.go | 112 +++------------------------------------ cmd/restic/cmd_backup.go | 2 +- 2 files changed, 8 insertions(+), 106 deletions(-) diff --git a/archiver.go b/archiver.go index 594b56673..2c4360f27 100644 --- a/archiver.go +++ b/archiver.go @@ -1,7 +1,6 @@ package restic import ( - "errors" "fmt" "io" "os" @@ -25,8 +24,7 @@ type Archiver struct { s Server ch *ContentHandler - bl *BlobList // blobs used for the current snapshot - parentBl *BlobList // blobs from the parent snapshot + bl *BlobList // blobs used for the current snapshot fileToken chan struct{} blobToken chan struct{} @@ -255,45 +253,8 @@ func (arch *Archiver) SaveFile(node *Node) error { return nil } -func (arch *Archiver) populateFromOldTree(tree, oldTree Tree) error { - // update content from old tree - err := tree.PopulateFrom(oldTree) - if err != nil { - return err - } - - // add blobs to bloblist - for _, node := range tree { - if node.Content != nil { - for _, blobID := range node.Content { - blob, err := arch.parentBl.Find(Blob{ID: blobID}) - if err != nil { - return err - } - - arch.bl.Insert(blob) - } - } - } - - return nil -} - -func (arch *Archiver) loadTree(dir string, oldTreeID backend.ID) (*Tree, error) { - var ( - oldTree Tree - err error - ) - - if oldTreeID != nil { - // load old tree - oldTree, err = LoadTree(arch.ch, oldTreeID) - if err != nil { - return nil, arrar.Annotate(err, "load old tree") - } - - debug("old tree: %v\n", oldTree) - } +func (arch *Archiver) scan(dir string) (*Tree, error) { + var err error // open and list path fd, err := os.Open(dir) @@ -328,29 +289,13 @@ func (arch *Archiver) loadTree(dir string, oldTreeID backend.ID) (*Tree, error) } if entry.IsDir() { - oldSubtree, err := oldTree.Find(node.Name) - if err != nil && err != ErrNodeNotFound { - return nil, err - } - - var oldSubtreeID backend.ID - if err == nil { - oldSubtreeID = oldSubtree.Subtree - } - - node.Tree, err = arch.loadTree(path, oldSubtreeID) + node.Tree, err = arch.scan(path) if err != nil { return nil, err } } } - // populate with content from oldTree - err = arch.populateFromOldTree(tree, oldTree) - if err != nil { - return nil, err - } - for _, node := range tree { if node.Type == "file" && node.Content != nil { continue @@ -372,34 +317,7 @@ func (arch *Archiver) loadTree(dir string, oldTreeID backend.ID) (*Tree, error) return &tree, nil } -func (arch *Archiver) LoadTree(path string, parentSnapshot backend.ID) (*Tree, error) { - var oldTree Tree - - if parentSnapshot != nil { - // load old tree from snapshot - snapshot, err := LoadSnapshot(arch.ch, parentSnapshot) - if err != nil { - return nil, arrar.Annotate(err, "load old snapshot") - } - - if snapshot.Tree == nil { - return nil, errors.New("snapshot without tree!") - } - - // load old bloblist from snapshot - arch.parentBl, err = LoadBlobList(arch.ch, snapshot.Map) - if err != nil { - return nil, err - } - - oldTree, err = LoadTree(arch.ch, snapshot.Tree) - if err != nil { - return nil, arrar.Annotate(err, "load old tree") - } - - debug("old tree: %v\n", oldTree) - } - +func (arch *Archiver) Scan(path string) (*Tree, error) { // reset global stats arch.updateStats = Stats{} @@ -416,13 +334,7 @@ func (arch *Archiver) LoadTree(path string, parentSnapshot backend.ID) (*Tree, e if node.Type != "dir" { t := &Tree{node} - // populate with content from oldTree - err = arch.populateFromOldTree(*t, oldTree) - if err != nil { - return nil, err - } - - // if no old node has been found, update stats + // update stats if node.Content == nil && node.Subtree == nil { arch.Stats.Files = 1 arch.Stats.Bytes = node.Size @@ -435,17 +347,7 @@ func (arch *Archiver) LoadTree(path string, parentSnapshot backend.ID) (*Tree, e arch.Stats.Directories = 1 - var oldSubtreeID backend.ID - oldSubtree, err := oldTree.Find(node.Name) - if err != nil && err != ErrNodeNotFound { - return nil, arrar.Annotate(err, "search node in old tree") - } - - if err == nil { - oldSubtreeID = oldSubtree.Subtree - } - - node.Tree, err = arch.loadTree(path, oldSubtreeID) + node.Tree, err = arch.scan(path) if err != nil { return nil, arrar.Annotate(err, "loadTree()") } diff --git a/cmd/restic/cmd_backup.go b/cmd/restic/cmd_backup.go index 03365635a..52226c76e 100644 --- a/cmd/restic/cmd_backup.go +++ b/cmd/restic/cmd_backup.go @@ -117,7 +117,7 @@ func (cmd CmdBackup) Execute(args []string) error { // return true // } - t, err := arch.LoadTree(target, parentSnapshotID) + t, err := arch.Scan(target) if err != nil { fmt.Fprintf(os.Stderr, "error: %v\n", err) return err From 4b70bba588304ee254a5c580443ada7dd277e1e1 Mon Sep 17 00:00:00 2001 From: Alexander Neumann Date: Sun, 4 Jan 2015 18:23:00 +0100 Subject: [PATCH 2/6] Split Scanner from Archiver, refactor Progress --- archiver.go | 175 +++------------------------------------ cmd/restic/cmd_backup.go | 119 +++++++++++--------------- progress.go | 165 ++++++++++++++++++++++++++++++++++++ scanner.go | 121 +++++++++++++++++++++++++++ tree.go | 1 + 5 files changed, 344 insertions(+), 237 deletions(-) create mode 100644 progress.go create mode 100644 scanner.go diff --git a/archiver.go b/archiver.go index 2c4360f27..af1e4a42d 100644 --- a/archiver.go +++ b/archiver.go @@ -4,9 +4,7 @@ import ( "fmt" "io" "os" - "path/filepath" "sync" - "time" "github.com/juju/arrar" "github.com/restic/restic/backend" @@ -16,8 +14,6 @@ import ( const ( maxConcurrentFiles = 8 maxConcurrentBlobs = 8 - - statTimeout = 20 * time.Millisecond ) type Archiver struct { @@ -29,36 +25,17 @@ type Archiver struct { fileToken chan struct{} blobToken chan struct{} - Stats Stats - Error func(dir string, fi os.FileInfo, err error) error Filter func(item string, fi os.FileInfo) bool - ScannerStats chan Stats - SaveStats chan Stats - - statsMutex sync.Mutex - updateStats Stats + p *Progress } -type Stats struct { - Files int - Directories int - Other int - Bytes uint64 -} - -func (s *Stats) Add(other Stats) { - s.Bytes += other.Bytes - s.Directories += other.Directories - s.Files += other.Files - s.Other += other.Other -} - -func NewArchiver(s Server) (*Archiver, error) { +func NewArchiver(s Server, p *Progress) (*Archiver, error) { var err error arch := &Archiver{ s: s, + p: p, fileToken: make(chan struct{}, maxConcurrentFiles), blobToken: make(chan struct{}, maxConcurrentBlobs), } @@ -92,34 +69,6 @@ func NewArchiver(s Server) (*Archiver, error) { return arch, nil } -func (arch *Archiver) update(ch chan Stats, stats Stats) { - if ch == nil { - return - } - - // load old stats from global state - arch.statsMutex.Lock() - stats.Add(arch.updateStats) - arch.updateStats = Stats{} - arch.statsMutex.Unlock() - - // try to send stats through the channel, with a timeout - timeout := time.After(statTimeout) - - select { - case ch <- stats: - break - case _ = <-timeout: - - // save cumulated stats to global state - arch.statsMutex.Lock() - arch.updateStats.Add(stats) - arch.statsMutex.Unlock() - - break - } -} - func (arch *Archiver) Save(t backend.Type, data []byte) (Blob, error) { blob, err := arch.ch.Save(t, data) if err != nil { @@ -179,7 +128,7 @@ func (arch *Archiver) SaveFile(node *Node) error { return arrar.Annotate(err, "SaveFile() save chunk") } - arch.update(arch.SaveStats, Stats{Bytes: blob.Size}) + arch.p.Report(Stat{Bytes: blob.Size}) blobs = Blobs{blob} } @@ -219,7 +168,7 @@ func (arch *Archiver) SaveFile(node *Node) error { FreeChunkBuf("blob chunker", buf) - arch.update(arch.SaveStats, Stats{Bytes: blob.Size}) + arch.p.Report(Stat{Bytes: blob.Size}) arch.blobToken <- token ch <- blob }(resCh) @@ -253,110 +202,6 @@ func (arch *Archiver) SaveFile(node *Node) error { return nil } -func (arch *Archiver) scan(dir string) (*Tree, error) { - var err error - - // open and list path - fd, err := os.Open(dir) - defer fd.Close() - if err != nil { - return nil, arch.Error(dir, nil, err) - } - - entries, err := fd.Readdir(-1) - if err != nil { - return nil, err - } - - // build new tree - tree := Tree{} - for _, entry := range entries { - path := filepath.Join(dir, entry.Name()) - - if !arch.Filter(path, entry) { - continue - } - - node, err := NodeFromFileInfo(path, entry) - if err != nil { - // TODO: error processing - return nil, err - } - - err = tree.Insert(node) - if err != nil { - return nil, err - } - - if entry.IsDir() { - node.Tree, err = arch.scan(path) - if err != nil { - return nil, err - } - } - } - - for _, node := range tree { - if node.Type == "file" && node.Content != nil { - continue - } - - switch node.Type { - case "file": - arch.Stats.Files++ - arch.Stats.Bytes += node.Size - case "dir": - arch.Stats.Directories++ - default: - arch.Stats.Other++ - } - } - - arch.update(arch.ScannerStats, arch.Stats) - - return &tree, nil -} - -func (arch *Archiver) Scan(path string) (*Tree, error) { - // reset global stats - arch.updateStats = Stats{} - - fi, err := os.Lstat(path) - if err != nil { - return nil, arrar.Annotatef(err, "Lstat(%q)", path) - } - - node, err := NodeFromFileInfo(path, fi) - if err != nil { - return nil, arrar.Annotate(err, "NodeFromFileInfo()") - } - - if node.Type != "dir" { - t := &Tree{node} - - // update stats - if node.Content == nil && node.Subtree == nil { - arch.Stats.Files = 1 - arch.Stats.Bytes = node.Size - } - - arch.update(arch.ScannerStats, arch.Stats) - - return t, nil - } - - arch.Stats.Directories = 1 - - node.Tree, err = arch.scan(path) - if err != nil { - return nil, arrar.Annotate(err, "loadTree()") - } - - arch.update(arch.ScannerStats, arch.Stats) - - return &Tree{node}, nil -} - func (arch *Archiver) saveTree(t *Tree) (Blob, error) { var wg sync.WaitGroup @@ -367,7 +212,7 @@ func (arch *Archiver) saveTree(t *Tree) (Blob, error) { return Blob{}, err } node.Subtree = b.ID - arch.update(arch.SaveStats, Stats{Directories: 1}) + arch.p.Report(Stat{Dirs: 1}) } else if node.Type == "file" && len(node.Content) == 0 { // get token token := <-arch.fileToken @@ -385,10 +230,10 @@ func (arch *Archiver) saveTree(t *Tree) (Blob, error) { if err != nil { panic(err) } - arch.update(arch.SaveStats, Stats{Files: 1}) + arch.p.Report(Stat{Files: 1}) }(node) } else { - arch.update(arch.SaveStats, Stats{Other: 1}) + arch.p.Report(Stat{Other: 1}) } } @@ -410,8 +255,8 @@ func (arch *Archiver) saveTree(t *Tree) (Blob, error) { } func (arch *Archiver) Snapshot(dir string, t *Tree, parentSnapshot backend.ID) (*Snapshot, backend.ID, error) { - // reset global stats - arch.updateStats = Stats{} + arch.p.Start() + defer arch.p.Done() sn, err := NewSnapshot(dir) if err != nil { diff --git a/cmd/restic/cmd_backup.go b/cmd/restic/cmd_backup.go index 52226c76e..ebcc5c0fe 100644 --- a/cmd/restic/cmd_backup.go +++ b/cmd/restic/cmd_backup.go @@ -40,7 +40,7 @@ func format_bytes(c uint64) string { } } -func format_duration(sec uint64) string { +func format_seconds(sec uint64) string { hours := sec / 3600 sec -= hours * 3600 min := sec / 60 @@ -52,6 +52,11 @@ func format_duration(sec uint64) string { return fmt.Sprintf("%d:%02d", min, sec) } +func format_duration(d time.Duration) string { + sec := uint64(d / time.Second) + return format_seconds(sec) +} + func print_tree2(indent int, t *restic.Tree) { for _, node := range *t { if node.Tree != nil { @@ -89,27 +94,16 @@ func (cmd CmdBackup) Execute(args []string) error { fmt.Printf("found parent snapshot %v\n", parentSnapshotID) } - arch, err := restic.NewArchiver(s) - if err != nil { - fmt.Fprintf(os.Stderr, "err: %v\n", err) - } - arch.Error = func(dir string, fi os.FileInfo, err error) error { - // TODO: make ignoring errors configurable - fmt.Fprintf(os.Stderr, "\nerror for %s: %v\n%v\n", dir, err, fi) - return nil - } - fmt.Printf("scanning %s\n", target) + scanProgress := restic.NewProgress(time.Second) if terminal.IsTerminal(int(os.Stdout.Fd())) { - ch := make(chan restic.Stats, 20) - arch.ScannerStats = ch - - go func(ch <-chan restic.Stats) { - for stats := range ch { - fmt.Printf("\r%6d directories, %6d files, %14s", stats.Directories, stats.Files, format_bytes(stats.Bytes)) - } - }(ch) + scanProgress.F = func(s restic.Stat, d time.Duration, ticker bool) { + fmt.Printf("\x1b[2K\r[%s] %d directories, %d files, %s", format_duration(d), s.Dirs, s.Files, format_bytes(s.Bytes)) + } + scanProgress.D = func(s restic.Stat, d time.Duration, ticker bool) { + fmt.Printf("\nDone in %s\n", format_duration(d)) + } } // TODO: add filter @@ -117,59 +111,51 @@ func (cmd CmdBackup) Execute(args []string) error { // return true // } - t, err := arch.Scan(target) + sc := restic.NewScanner(scanProgress) + + t, err := sc.Scan(target) if err != nil { fmt.Fprintf(os.Stderr, "error: %v\n", err) return err } - fmt.Printf("\r%6d directories, %6d files, %14s\n", arch.Stats.Directories, arch.Stats.Files, format_bytes(arch.Stats.Bytes)) + archiveProgress := restic.NewProgress(time.Second) + targetStat := scanProgress.Current() - stats := restic.Stats{} - start := time.Now() if terminal.IsTerminal(int(os.Stdout.Fd())) { - ch := make(chan restic.Stats, 20) - arch.SaveStats = ch - - ticker := time.NewTicker(time.Second) - var eta, bps uint64 - - go func(ch <-chan restic.Stats) { - - status := func(sec uint64) { - fmt.Printf("\x1b[2K\r[%s] %3.2f%% %s/s %s / %s ETA %s", - format_duration(sec), - float64(stats.Bytes)/float64(arch.Stats.Bytes)*100, - format_bytes(bps), - format_bytes(stats.Bytes), format_bytes(arch.Stats.Bytes), - format_duration(eta)) + var bps, eta uint64 + archiveProgress.F = func(s restic.Stat, d time.Duration, ticker bool) { + sec := uint64(d / time.Second) + if sec > 0 && ticker { + bps = s.Bytes / sec + eta = (targetStat.Bytes - s.Bytes) / bps } - defer ticker.Stop() - for { - select { - case s, ok := <-ch: - if !ok { - return - } - stats.Files += s.Files - stats.Directories += s.Directories - stats.Other += s.Other - stats.Bytes += s.Bytes + fmt.Printf("\x1b[2K\r[%s] %3.2f%% %s/s %s / %s ETA %s", + format_duration(d), + float64(s.Bytes)/float64(targetStat.Bytes)*100, + format_bytes(bps), + format_bytes(s.Bytes), format_bytes(targetStat.Bytes), + format_seconds(eta)) + } - status(uint64(time.Since(start) / time.Second)) - case <-ticker.C: - sec := uint64(time.Since(start) / time.Second) - bps = stats.Bytes / sec + archiveProgress.D = func(s restic.Stat, d time.Duration, ticker bool) { + sec := uint64(d / time.Second) + fmt.Printf("\nduration: %s, %.2fMiB/s\n", + format_duration(d), + float64(targetStat.Bytes)/float64(sec)/(1<<20)) + } + } - if bps > 0 { - eta = (arch.Stats.Bytes - stats.Bytes) / bps - } + arch, err := restic.NewArchiver(s, archiveProgress) + if err != nil { + fmt.Fprintf(os.Stderr, "err: %v\n", err) + } - status(sec) - } - } - }(ch) + arch.Error = func(dir string, fi os.FileInfo, err error) error { + // TODO: make ignoring errors configurable + fmt.Fprintf(os.Stderr, "\nerror for %s: %v\n%v\n", dir, err, fi) + return nil } _, id, err := arch.Snapshot(target, t, parentSnapshotID) @@ -177,23 +163,12 @@ func (cmd CmdBackup) Execute(args []string) error { fmt.Fprintf(os.Stderr, "error: %v\n", err) } - if terminal.IsTerminal(int(os.Stdout.Fd())) { - // close channels so that the goroutines terminate - close(arch.SaveStats) - close(arch.ScannerStats) - } - plen, err := s.PrefixLength(backend.Snapshot) if err != nil { return err } - fmt.Printf("\nsnapshot %s saved\n", id[:plen]) - - sec := uint64(time.Since(start) / time.Second) - fmt.Printf("duration: %s, %.2fMiB/s\n", - format_duration(sec), - float64(arch.Stats.Bytes)/float64(sec)/(1<<20)) + fmt.Printf("snapshot %s saved\n", id[:plen]) return nil } diff --git a/progress.go b/progress.go new file mode 100644 index 000000000..29db56624 --- /dev/null +++ b/progress.go @@ -0,0 +1,165 @@ +package restic + +import ( + "sync" + "time" +) + +type Progress struct { + F ProgressFunc + D ProgressFunc + fnM sync.Mutex + + cur Stat + curM sync.Mutex + start time.Time + c *time.Ticker + cancel chan struct{} + o sync.Once + d time.Duration + + running bool +} + +type Stat struct { + Files uint64 + Dirs uint64 + Other uint64 + Bytes uint64 +} + +type ProgressFunc func(s Stat, runtime time.Duration, ticker bool) + +// NewProgress returns a new progress reporter. After Start() has been called, +// the function fn is called when new data arrives or at least every d +// interval. The function doneFn is called when Done() is called. Both +// functions F and D are called synchronously and can use shared state. +func NewProgress(d time.Duration) *Progress { + return &Progress{d: d} +} + +// Start runs resets and runs the progress reporter. +func (p *Progress) Start() { + if p == nil { + return + } + + if p.running { + panic("truing to reset a running Progress") + } + + p.o = sync.Once{} + p.cancel = make(chan struct{}) + p.running = true + p.Reset() + p.start = time.Now() + p.c = time.NewTicker(p.d) + + go p.reporter() +} + +// Report adds the statistics from s to the current state and tries to report +// the accumulated statistics via the feedback channel. +func (p *Progress) Report(s Stat) { + if p == nil { + return + } + + if !p.running { + panic("reporting in a non-running Progress") + } + + p.curM.Lock() + p.cur.Add(s) + cur := p.cur + p.curM.Unlock() + + // update progress + if p.F != nil { + p.fnM.Lock() + p.F(cur, time.Since(p.start), false) + p.fnM.Unlock() + } +} + +func (p *Progress) reporter() { + if p == nil { + return + } + + for { + select { + case <-p.c.C: + p.curM.Lock() + cur := p.cur + p.curM.Unlock() + + if p.F != nil { + p.fnM.Lock() + p.F(cur, time.Since(p.start), true) + p.fnM.Unlock() + } + case <-p.cancel: + p.c.Stop() + return + } + } +} + +// Reset resets all statistic counters to zero. +func (p *Progress) Reset() { + if p == nil { + return + } + + if !p.running { + panic("resetting a non-running Progress") + } + + p.curM.Lock() + p.cur = Stat{} + p.curM.Unlock() +} + +// Done closes the progress report. +func (p *Progress) Done() { + if p == nil { + return + } + + if !p.running { + panic("Done() called on non-running Progress") + } + + if p.running { + p.running = false + p.o.Do(func() { + close(p.cancel) + }) + + cur := p.cur + + if p.D != nil { + p.fnM.Lock() + p.D(cur, time.Since(p.start), false) + p.fnM.Unlock() + } + } +} + +// Current returns the current stat value. +func (p *Progress) Current() Stat { + p.curM.Lock() + s := p.cur + p.curM.Unlock() + + return s +} + +// Add accumulates other into s. +func (s *Stat) Add(other Stat) { + s.Bytes += other.Bytes + s.Dirs += other.Dirs + s.Files += other.Files + s.Other += other.Other +} diff --git a/scanner.go b/scanner.go new file mode 100644 index 000000000..15d59dd06 --- /dev/null +++ b/scanner.go @@ -0,0 +1,121 @@ +package restic + +import ( + "os" + "path/filepath" + + "github.com/juju/arrar" +) + +type FilterFunc func(item string, fi os.FileInfo) bool +type ErrorFunc func(dir string, fi os.FileInfo, err error) error + +type Scanner struct { + Error ErrorFunc + Filter FilterFunc + + p *Progress +} + +func NewScanner(p *Progress) *Scanner { + sc := &Scanner{p: p} + + // abort on all errors + sc.Error = func(s string, fi os.FileInfo, err error) error { return err } + // allow all files + sc.Filter = func(string, os.FileInfo) bool { return true } + + return sc +} + +func scan(filterFn FilterFunc, progress *Progress, dir string) (*Tree, error) { + var err error + + // open and list path + fd, err := os.Open(dir) + defer fd.Close() + + if err != nil { + return nil, err + } + + entries, err := fd.Readdir(-1) + if err != nil { + return nil, err + } + + // build new tree + tree := Tree{} + for _, entry := range entries { + path := filepath.Join(dir, entry.Name()) + + if !filterFn(path, entry) { + continue + } + + node, err := NodeFromFileInfo(path, entry) + if err != nil { + // TODO: error processing + return nil, err + } + + err = tree.Insert(node) + if err != nil { + return nil, err + } + + if entry.IsDir() { + // save all errors in node.err, sort out later + node.Tree, node.err = scan(filterFn, progress, path) + } + } + + for _, node := range tree { + if node.Type == "file" && node.Content != nil { + continue + } + + switch node.Type { + case "file": + progress.Report(Stat{Files: 1, Bytes: node.Size}) + case "dir": + progress.Report(Stat{Dirs: 1}) + default: + progress.Report(Stat{Other: 1}) + } + } + + return &tree, nil +} + +func (sc *Scanner) Scan(path string) (*Tree, error) { + sc.p.Start() + defer sc.p.Done() + + fi, err := os.Lstat(path) + if err != nil { + return nil, arrar.Annotatef(err, "Lstat(%q)", path) + } + + node, err := NodeFromFileInfo(path, fi) + if err != nil { + return nil, arrar.Annotate(err, "NodeFromFileInfo()") + } + + if node.Type != "dir" { + t := &Tree{node} + + sc.p.Report(Stat{Files: 1, Bytes: node.Size}) + + return t, nil + } + + sc.p.Report(Stat{Dirs: 1}) + + node.Tree, err = scan(sc.Filter, sc.p, path) + if err != nil { + return nil, arrar.Annotate(err, "loadTree()") + } + + return &Tree{node}, nil +} diff --git a/tree.go b/tree.go index 306093621..87a1ce106 100644 --- a/tree.go +++ b/tree.go @@ -40,6 +40,7 @@ type Node struct { Tree *Tree `json:"-"` path string + err error } var ( From fe231af7fcff92cafccb9ecde77b3455d12dab88 Mon Sep 17 00:00:00 2001 From: Alexander Neumann Date: Sun, 4 Jan 2015 20:07:30 +0100 Subject: [PATCH 3/6] Remove error from return value of NewContentHandler --- archiver.go | 5 +---- cmd/restic/cmd_cat.go | 5 +---- cmd/restic/cmd_find.go | 6 +----- cmd/restic/cmd_fsck.go | 5 +---- cmd/restic/cmd_ls.go | 2 +- cmd/restic/cmd_snapshots.go | 5 +---- contenthandler.go | 4 ++-- restorer.go | 5 +---- 8 files changed, 9 insertions(+), 28 deletions(-) diff --git a/archiver.go b/archiver.go index af1e4a42d..c3d1b8e54 100644 --- a/archiver.go +++ b/archiver.go @@ -55,10 +55,7 @@ func NewArchiver(s Server, p *Progress) (*Archiver, error) { arch.Filter = func(string, os.FileInfo) bool { return true } arch.bl = NewBlobList() - arch.ch, err = NewContentHandler(s) - if err != nil { - return nil, err - } + arch.ch = NewContentHandler(s) // load all blobs from all snapshots err = arch.ch.LoadAllMaps() diff --git a/cmd/restic/cmd_cat.go b/cmd/restic/cmd_cat.go index 2ee657bc3..47acbe38e 100644 --- a/cmd/restic/cmd_cat.go +++ b/cmd/restic/cmd_cat.go @@ -53,10 +53,7 @@ func (cmd CmdCat) Execute(args []string) error { } } - ch, err := restic.NewContentHandler(s) - if err != nil { - return err - } + ch := restic.NewContentHandler(s) switch tpe { case "blob": diff --git a/cmd/restic/cmd_find.go b/cmd/restic/cmd_find.go index 55cec11be..544d5be28 100644 --- a/cmd/restic/cmd_find.go +++ b/cmd/restic/cmd_find.go @@ -113,11 +113,7 @@ func (c CmdFind) findInTree(ch *restic.ContentHandler, id backend.ID, path strin func (c CmdFind) findInSnapshot(s restic.Server, id backend.ID) error { debug("searching in snapshot %s\n for entries within [%s %s]", id, c.oldest, c.newest) - ch, err := restic.NewContentHandler(s) - if err != nil { - return err - } - + ch := restic.NewContentHandler(s) sn, err := ch.LoadSnapshot(id) if err != nil { return err diff --git a/cmd/restic/cmd_fsck.go b/cmd/restic/cmd_fsck.go index 44e8b307d..033a650a6 100644 --- a/cmd/restic/cmd_fsck.go +++ b/cmd/restic/cmd_fsck.go @@ -125,10 +125,7 @@ func fsckTree(opts CmdFsck, ch *restic.ContentHandler, id backend.ID) error { func fsck_snapshot(opts CmdFsck, s restic.Server, id backend.ID) error { debug("checking snapshot %v\n", id) - ch, err := restic.NewContentHandler(s) - if err != nil { - return err - } + ch := restic.NewContentHandler(s) sn, err := ch.LoadSnapshot(id) if err != nil { diff --git a/cmd/restic/cmd_ls.go b/cmd/restic/cmd_ls.go index e769ee456..037d6b488 100644 --- a/cmd/restic/cmd_ls.go +++ b/cmd/restic/cmd_ls.go @@ -78,7 +78,7 @@ func (cmd CmdLs) Execute(args []string) error { return err } - ch, err := restic.NewContentHandler(s) + ch := restic.NewContentHandler(s) if err != nil { return err } diff --git a/cmd/restic/cmd_snapshots.go b/cmd/restic/cmd_snapshots.go index 5432d29ae..1a1a7c25b 100644 --- a/cmd/restic/cmd_snapshots.go +++ b/cmd/restic/cmd_snapshots.go @@ -97,10 +97,7 @@ func (cmd CmdSnapshots) Execute(args []string) error { return err } - ch, err := restic.NewContentHandler(s) - if err != nil { - return err - } + ch := restic.NewContentHandler(s) tab := NewTable() tab.Header = fmt.Sprintf("%-8s %-19s %-10s %s", "ID", "Date", "Source", "Directory") diff --git a/contenthandler.go b/contenthandler.go index cba3dc833..202ae9049 100644 --- a/contenthandler.go +++ b/contenthandler.go @@ -17,13 +17,13 @@ type ContentHandler struct { } // NewContentHandler creates a new content handler. -func NewContentHandler(s Server) (*ContentHandler, error) { +func NewContentHandler(s Server) *ContentHandler { ch := &ContentHandler{ s: s, bl: NewBlobList(), } - return ch, nil + return ch } // LoadSnapshot adds all blobs from a snapshot into the content handler and returns the snapshot. diff --git a/restorer.go b/restorer.go index b57d83e77..c8d4b2d5b 100644 --- a/restorer.go +++ b/restorer.go @@ -25,10 +25,7 @@ func NewRestorer(s Server, snid backend.ID) (*Restorer, error) { r := &Restorer{s: s} var err error - r.ch, err = NewContentHandler(s) - if err != nil { - return nil, arrar.Annotate(err, "create contenthandler for restorer") - } + r.ch = NewContentHandler(s) r.sn, err = r.ch.LoadSnapshot(snid) if err != nil { From 2a97e2b08a05b50e55c8049676f9eb46ef2216b4 Mon Sep 17 00:00:00 2001 From: Alexander Neumann Date: Sun, 4 Jan 2015 20:11:32 +0100 Subject: [PATCH 4/6] Add tree.Stat() --- progress.go | 22 ++++++++++++++++++++++ tree.go | 18 ++++++++++++++++++ 2 files changed, 40 insertions(+) diff --git a/progress.go b/progress.go index 29db56624..8bb427808 100644 --- a/progress.go +++ b/progress.go @@ -1,6 +1,7 @@ package restic import ( + "fmt" "sync" "time" ) @@ -163,3 +164,24 @@ func (s *Stat) Add(other Stat) { s.Files += other.Files s.Other += other.Other } + +func (s Stat) String() string { + b := float64(s.Bytes) + var str string + + switch { + case s.Bytes > 1<<40: + str = fmt.Sprintf("%.3f TiB", b/(1<<40)) + case s.Bytes > 1<<30: + str = fmt.Sprintf("%.3f GiB", b/(1<<30)) + case s.Bytes > 1<<20: + str = fmt.Sprintf("%.3f MiB", b/(1<<20)) + case s.Bytes > 1<<10: + str = fmt.Sprintf("%.3f KiB", b/(1<<10)) + default: + str = fmt.Sprintf("%dB", s.Bytes) + } + + return fmt.Sprintf("Stat(%d files, %d dirs, %d other, %v)", + s.Files, s.Dirs, s.Other, str) +} diff --git a/tree.go b/tree.go index 87a1ce106..db51404be 100644 --- a/tree.go +++ b/tree.go @@ -151,6 +151,24 @@ func (t Tree) Find(name string) (*Node, error) { return node, err } +func (t Tree) Stat() Stat { + s := Stat{} + for _, n := range t { + switch n.Type { + case "file": + s.Files++ + s.Bytes += n.Size + case "dir": + s.Dirs++ + s.Add(n.Tree.Stat()) + default: + s.Other++ + } + } + + return s +} + func (node *Node) fill_extra(path string, fi os.FileInfo) (err error) { stat, ok := fi.Sys().(*syscall.Stat_t) if !ok { From e543f5926c7ea63c561847862acf798464c7fece Mon Sep 17 00:00:00 2001 From: Alexander Neumann Date: Sun, 4 Jan 2015 22:39:30 +0100 Subject: [PATCH 5/6] Refactor Archiver and backup command Improve incremental backup by first scanning the tree, loading the old tree and afterwards comparing both trees in memory. --- archiver.go | 31 ++++++++++--- cmd/restic/cmd_backup.go | 43 +++++++++++++----- contenthandler.go | 5 +++ progress.go | 6 +-- scanner.go | 6 +-- tree.go | 94 ++++++++++++++++++++++++++++++++++------ 6 files changed, 149 insertions(+), 36 deletions(-) diff --git a/archiver.go b/archiver.go index c3d1b8e54..27418645b 100644 --- a/archiver.go +++ b/archiver.go @@ -1,6 +1,7 @@ package restic import ( + "errors" "fmt" "io" "os" @@ -58,6 +59,7 @@ func NewArchiver(s Server, p *Progress) (*Archiver, error) { arch.ch = NewContentHandler(s) // load all blobs from all snapshots + // TODO: only use bloblist from old snapshot if available err = arch.ch.LoadAllMaps() if err != nil { return nil, err @@ -96,7 +98,28 @@ func (arch *Archiver) SaveFile(node *Node) error { file, err := os.Open(node.path) defer file.Close() if err != nil { - return arrar.Annotate(err, "SaveFile()") + return arrar.Annotatef(err, "SaveFile(%v)", node.path) + } + + // check file again + fi, err := file.Stat() + if err != nil { + return err + } + + if fi.ModTime() != node.ModTime { + e2 := arch.Error(node.path, fi, errors.New("file changed as we read it\n")) + + if e2 == nil { + // create new node + n, err := NodeFromFileInfo(node.path, fi) + if err != nil { + return err + } + + // copy node + *node = *n + } } var blobs Blobs @@ -203,8 +226,8 @@ func (arch *Archiver) saveTree(t *Tree) (Blob, error) { var wg sync.WaitGroup for _, node := range *t { - if node.Tree != nil && node.Subtree == nil { - b, err := arch.saveTree(node.Tree) + if node.tree != nil && node.Subtree == nil { + b, err := arch.saveTree(node.tree) if err != nil { return Blob{}, err } @@ -229,8 +252,6 @@ func (arch *Archiver) saveTree(t *Tree) (Blob, error) { } arch.p.Report(Stat{Files: 1}) }(node) - } else { - arch.p.Report(Stat{Other: 1}) } } diff --git a/cmd/restic/cmd_backup.go b/cmd/restic/cmd_backup.go index ebcc5c0fe..290938c26 100644 --- a/cmd/restic/cmd_backup.go +++ b/cmd/restic/cmd_backup.go @@ -3,6 +3,7 @@ package main import ( "fmt" "os" + "path/filepath" "strings" "time" @@ -59,9 +60,9 @@ func format_duration(d time.Duration) string { func print_tree2(indent int, t *restic.Tree) { for _, node := range *t { - if node.Tree != nil { + if node.Tree() != nil { fmt.Printf("%s%s/\n", strings.Repeat(" ", indent), node.Name) - print_tree2(indent+1, node.Tree) + print_tree2(indent+1, node.Tree()) } else { fmt.Printf("%s%s\n", strings.Repeat(" ", indent), node.Name) } @@ -94,7 +95,7 @@ func (cmd CmdBackup) Execute(args []string) error { fmt.Printf("found parent snapshot %v\n", parentSnapshotID) } - fmt.Printf("scanning %s\n", target) + fmt.Printf("scan %s\n", target) scanProgress := restic.NewProgress(time.Second) if terminal.IsTerminal(int(os.Stdout.Fd())) { @@ -113,29 +114,51 @@ func (cmd CmdBackup) Execute(args []string) error { sc := restic.NewScanner(scanProgress) - t, err := sc.Scan(target) + newTree, err := sc.Scan(target) if err != nil { fmt.Fprintf(os.Stderr, "error: %v\n", err) return err } + if parentSnapshotID != nil { + fmt.Printf("load old snapshot\n") + ch := restic.NewContentHandler(s) + sn, err := ch.LoadSnapshot(parentSnapshotID) + if err != nil { + return err + } + + oldTree, err := restic.LoadTreeRecursive(filepath.Dir(sn.Dir), ch, sn.Tree) + if err != nil { + return err + } + + newTree.CopyFrom(oldTree) + } + archiveProgress := restic.NewProgress(time.Second) - targetStat := scanProgress.Current() + targetStat := newTree.StatTodo() if terminal.IsTerminal(int(os.Stdout.Fd())) { var bps, eta uint64 + itemsTodo := targetStat.Files + targetStat.Dirs + archiveProgress.F = func(s restic.Stat, d time.Duration, ticker bool) { sec := uint64(d / time.Second) - if sec > 0 && ticker { + if targetStat.Bytes > 0 && sec > 0 && ticker { bps = s.Bytes / sec - eta = (targetStat.Bytes - s.Bytes) / bps + if bps > 0 { + eta = (targetStat.Bytes - s.Bytes) / bps + } } - fmt.Printf("\x1b[2K\r[%s] %3.2f%% %s/s %s / %s ETA %s", + itemsDone := s.Files + s.Dirs + fmt.Printf("\x1b[2K\r[%s] %3.2f%% %s/s %s / %s %d / %d items ETA %s", format_duration(d), float64(s.Bytes)/float64(targetStat.Bytes)*100, format_bytes(bps), format_bytes(s.Bytes), format_bytes(targetStat.Bytes), + itemsDone, itemsTodo, format_seconds(eta)) } @@ -154,11 +177,11 @@ func (cmd CmdBackup) Execute(args []string) error { arch.Error = func(dir string, fi os.FileInfo, err error) error { // TODO: make ignoring errors configurable - fmt.Fprintf(os.Stderr, "\nerror for %s: %v\n%v\n", dir, err, fi) + fmt.Fprintf(os.Stderr, "\nerror for %s: %v\n", dir, err) return nil } - _, id, err := arch.Snapshot(target, t, parentSnapshotID) + _, id, err := arch.Snapshot(target, newTree, parentSnapshotID) if err != nil { fmt.Fprintf(os.Stderr, "error: %v\n", err) } diff --git a/contenthandler.go b/contenthandler.go index 202ae9049..7381178e8 100644 --- a/contenthandler.go +++ b/contenthandler.go @@ -243,3 +243,8 @@ func (ch *ContentHandler) Test(t backend.Type, id backend.ID) (bool, error) { return ch.s.Test(t, id) } + +// BlobList returns the current BlobList. +func (ch *ContentHandler) BlobList() *BlobList { + return ch.bl +} diff --git a/progress.go b/progress.go index 8bb427808..086b06a5c 100644 --- a/progress.go +++ b/progress.go @@ -25,7 +25,6 @@ type Progress struct { type Stat struct { Files uint64 Dirs uint64 - Other uint64 Bytes uint64 } @@ -162,7 +161,6 @@ func (s *Stat) Add(other Stat) { s.Bytes += other.Bytes s.Dirs += other.Dirs s.Files += other.Files - s.Other += other.Other } func (s Stat) String() string { @@ -182,6 +180,6 @@ func (s Stat) String() string { str = fmt.Sprintf("%dB", s.Bytes) } - return fmt.Sprintf("Stat(%d files, %d dirs, %d other, %v)", - s.Files, s.Dirs, s.Other, str) + return fmt.Sprintf("Stat(%d files, %d dirs, %v)", + s.Files, s.Dirs, str) } diff --git a/scanner.go b/scanner.go index 15d59dd06..9c90465d1 100644 --- a/scanner.go +++ b/scanner.go @@ -66,7 +66,7 @@ func scan(filterFn FilterFunc, progress *Progress, dir string) (*Tree, error) { if entry.IsDir() { // save all errors in node.err, sort out later - node.Tree, node.err = scan(filterFn, progress, path) + node.tree, node.err = scan(filterFn, progress, path) } } @@ -80,8 +80,6 @@ func scan(filterFn FilterFunc, progress *Progress, dir string) (*Tree, error) { progress.Report(Stat{Files: 1, Bytes: node.Size}) case "dir": progress.Report(Stat{Dirs: 1}) - default: - progress.Report(Stat{Other: 1}) } } @@ -112,7 +110,7 @@ func (sc *Scanner) Scan(path string) (*Tree, error) { sc.p.Report(Stat{Dirs: 1}) - node.Tree, err = scan(sc.Filter, sc.p, path) + node.tree, err = scan(sc.Filter, sc.p, path) if err != nil { return nil, arrar.Annotate(err, "loadTree()") } diff --git a/tree.go b/tree.go index db51404be..64398ad09 100644 --- a/tree.go +++ b/tree.go @@ -6,6 +6,8 @@ import ( "fmt" "os" "os/user" + "path/filepath" + "reflect" "sort" "strconv" "strings" @@ -37,7 +39,7 @@ type Node struct { Content []backend.ID `json:"content"` Subtree backend.ID `json:"subtree,omitempty"` - Tree *Tree `json:"-"` + tree *Tree path string err error @@ -92,11 +94,33 @@ func LoadTree(ch *ContentHandler, id backend.ID) (Tree, error) { return tree, nil } -// PopulateFrom copies subtrees and content from other when it hasn't changed. -func (t Tree) PopulateFrom(other Tree) error { +// LoadTreeRecursive loads the tree and all subtrees via ch. +func LoadTreeRecursive(path string, ch *ContentHandler, id backend.ID) (Tree, error) { + tree, err := LoadTree(ch, id) + if err != nil { + return nil, err + } + + for _, n := range tree { + n.path = filepath.Join(path, n.Name) + if n.Type == "dir" && n.Subtree != nil { + t, err := LoadTreeRecursive(n.path, ch, n.Subtree) + if err != nil { + return nil, err + } + + n.tree = &t + } + } + + return tree, nil +} + +// CopyFrom recursively copies all content from other to t. +func (t Tree) CopyFrom(other Tree) { for _, node := range t { - // only copy entries for files - if node.Type != "file" { + // only process files and dirs + if node.Type != "file" && node.Type != "dir" { continue } @@ -108,14 +132,32 @@ func (t Tree) PopulateFrom(other Tree) error { continue } - // compare content - if node.SameContent(oldNode) { - // copy Content - node.Content = oldNode.Content + if node.Type == "file" { + // compare content + if node.SameContent(oldNode) { + // copy Content + node.Content = oldNode.Content + } + } else { + // fill in all subtrees from old subtree + node.tree.CopyFrom(*oldNode.tree) + + // check if tree has changed + if node.tree.Equals(*oldNode.tree) { + // if nothing has changed, copy subtree ID + node.Subtree = oldNode.Subtree + } } } +} - return nil +// Equals returns true if t and other have exactly the same nodes. +func (t Tree) Equals(other Tree) bool { + if len(t) != len(other) { + return false + } + + return reflect.DeepEqual(t, other) } func (t *Tree) Insert(node *Node) error { @@ -160,15 +202,41 @@ func (t Tree) Stat() Stat { s.Bytes += n.Size case "dir": s.Dirs++ - s.Add(n.Tree.Stat()) - default: - s.Other++ + if n.tree != nil { + s.Add(n.tree.Stat()) + } } } return s } +func (t Tree) StatTodo() Stat { + s := Stat{} + for _, n := range t { + switch n.Type { + case "file": + if n.Content == nil { + s.Files++ + s.Bytes += n.Size + } + case "dir": + if n.Subtree == nil { + s.Dirs++ + if n.tree != nil { + s.Add(n.tree.StatTodo()) + } + } + } + } + + return s +} + +func (node Node) Tree() *Tree { + return node.tree +} + func (node *Node) fill_extra(path string, fi os.FileInfo) (err error) { stat, ok := fi.Sys().(*syscall.Stat_t) if !ok { From 6eb969a4920a7a6cbe2f780f42cc728e07397e10 Mon Sep 17 00:00:00 2001 From: Alexander Neumann Date: Sun, 4 Jan 2015 22:58:58 +0100 Subject: [PATCH 6/6] Update modified files, store error message --- archiver.go | 20 +++++++++++++------- cmd/restic/cmd_fsck.go | 2 +- tree.go | 3 +++ 3 files changed, 17 insertions(+), 8 deletions(-) diff --git a/archiver.go b/archiver.go index 27418645b..6e475a5be 100644 --- a/archiver.go +++ b/archiver.go @@ -98,7 +98,7 @@ func (arch *Archiver) SaveFile(node *Node) error { file, err := os.Open(node.path) defer file.Close() if err != nil { - return arrar.Annotatef(err, "SaveFile(%v)", node.path) + return err } // check file again @@ -245,11 +245,7 @@ func (arch *Archiver) saveTree(t *Tree) (Blob, error) { arch.fileToken <- token }() - // TODO: handle error - err := arch.SaveFile(n) - if err != nil { - panic(err) - } + node.err = arch.SaveFile(n) arch.p.Report(Stat{Files: 1}) }(node) } @@ -259,9 +255,19 @@ func (arch *Archiver) saveTree(t *Tree) (Blob, error) { // check for invalid file nodes for _, node := range *t { - if node.Type == "file" && node.Content == nil { + if node.Type == "file" && node.Content == nil && node.err == nil { return Blob{}, fmt.Errorf("node %v has empty content", node.Name) } + + if node.err != nil { + err := arch.Error(node.path, nil, node.err) + if err != nil { + return Blob{}, err + } + + // save error message in node + node.Error = node.err.Error() + } } blob, err := arch.SaveJSON(backend.Tree, t) diff --git a/cmd/restic/cmd_fsck.go b/cmd/restic/cmd_fsck.go index 033a650a6..d349d96a2 100644 --- a/cmd/restic/cmd_fsck.go +++ b/cmd/restic/cmd_fsck.go @@ -99,7 +99,7 @@ func fsckTree(opts CmdFsck, ch *restic.ContentHandler, id backend.ID) error { switch node.Type { case "file": - if node.Content == nil { + if node.Content == nil && node.Error == "" { return fmt.Errorf("file node %q of tree %v has no content", node.Name, id) } diff --git a/tree.go b/tree.go index 64398ad09..7752e1d7c 100644 --- a/tree.go +++ b/tree.go @@ -39,6 +39,8 @@ type Node struct { Content []backend.ID `json:"content"` Subtree backend.ID `json:"subtree,omitempty"` + Error string `json:"error,omitempty"` + tree *Tree path string @@ -96,6 +98,7 @@ func LoadTree(ch *ContentHandler, id backend.ID) (Tree, error) { // LoadTreeRecursive loads the tree and all subtrees via ch. func LoadTreeRecursive(path string, ch *ContentHandler, id backend.ID) (Tree, error) { + // TODO: load subtrees in parallel tree, err := LoadTree(ch, id) if err != nil { return nil, err