From 79321a195cbdc5c88e8ad96d7b072255163db5a8 Mon Sep 17 00:00:00 2001 From: Michael Eischer Date: Fri, 20 May 2022 23:44:44 +0200 Subject: [PATCH 1/7] archiver: remove dead attribute from FutureNode --- internal/archiver/archiver.go | 5 +---- 1 file changed, 1 insertion(+), 4 deletions(-) diff --git a/internal/archiver/archiver.go b/internal/archiver/archiver.go index 0ed66db5d..94b964f4d 100644 --- a/internal/archiver/archiver.go +++ b/internal/archiver/archiver.go @@ -295,8 +295,7 @@ type FutureNode struct { snPath, target string // kept to call the error callback function - absTarget string - fi os.FileInfo + fi os.FileInfo node *restic.Node stats ItemStats @@ -366,8 +365,6 @@ func (arch *Archiver) Save(ctx context.Context, snPath, target string, previous return FutureNode{}, false, err } - fn.absTarget = abstarget - // exclude files by path before running Lstat to reduce number of lstat calls if !arch.SelectByName(abstarget) { debug.Log("%v is excluded by path", target) From dcb00fd2d16084e51566beb69ab76df8a12bde92 Mon Sep 17 00:00:00 2001 From: Michael Eischer Date: Sun, 22 May 2022 17:31:37 +0200 Subject: [PATCH 2/7] archiver: cleanup Saver interface --- internal/archiver/blob_saver.go | 1 - 1 file changed, 1 deletion(-) diff --git a/internal/archiver/blob_saver.go b/internal/archiver/blob_saver.go index 0e6959524..0bd7d1fd9 100644 --- a/internal/archiver/blob_saver.go +++ b/internal/archiver/blob_saver.go @@ -11,7 +11,6 @@ import ( // Saver allows saving a blob. type Saver interface { SaveBlob(ctx context.Context, t restic.BlobType, data []byte, id restic.ID, storeDuplicate bool) (restic.ID, bool, int, error) - Index() restic.MasterIndex } // BlobSaver concurrently saves incoming blobs to the repo. From 32f4997733413a084554e35c0048d5996b91964d Mon Sep 17 00:00:00 2001 From: Michael Eischer Date: Sat, 21 May 2022 00:31:26 +0200 Subject: [PATCH 3/7] archiver: remove unused fileInfo from progress callback --- cmd/restic/cmd_backup.go | 6 +++--- internal/archiver/archiver.go | 31 +++++++++++++--------------- internal/archiver/archiver_test.go | 10 ++++----- internal/archiver/scanner.go | 6 +++--- internal/archiver/scanner_test.go | 10 ++++----- internal/archiver/tree_saver.go | 2 +- internal/archiver/tree_saver_test.go | 5 ++--- internal/ui/backup/json.go | 5 ++--- internal/ui/backup/progress.go | 17 +++++++-------- internal/ui/backup/text.go | 5 ++--- 10 files changed, 45 insertions(+), 52 deletions(-) diff --git a/cmd/restic/cmd_backup.go b/cmd/restic/cmd_backup.go index ff8a41c6c..0b33f2263 100644 --- a/cmd/restic/cmd_backup.go +++ b/cmd/restic/cmd_backup.go @@ -647,7 +647,7 @@ func runBackup(opts BackupOptions, gopts GlobalOptions, term *termstatus.Termina } errorHandler := func(item string, err error) error { - return progressReporter.Error(item, nil, err) + return progressReporter.Error(item, err) } messageHandler := func(msg string, args ...interface{}) { @@ -690,9 +690,9 @@ func runBackup(opts BackupOptions, gopts GlobalOptions, term *termstatus.Termina arch.Select = selectFilter arch.WithAtime = opts.WithAtime success := true - arch.Error = func(item string, fi os.FileInfo, err error) error { + arch.Error = func(item string, err error) error { success = false - return progressReporter.Error(item, fi, err) + return progressReporter.Error(item, err) } arch.CompleteItem = progressReporter.CompleteItem arch.StartFile = progressReporter.StartFile diff --git a/internal/archiver/archiver.go b/internal/archiver/archiver.go index 94b964f4d..dbf1faa21 100644 --- a/internal/archiver/archiver.go +++ b/internal/archiver/archiver.go @@ -27,7 +27,7 @@ type SelectFunc func(item string, fi os.FileInfo) bool // ErrorFunc is called when an error during archiving occurs. When nil is // returned, the archiver continues, otherwise it aborts and passes the error // up the call stack. -type ErrorFunc func(file string, fi os.FileInfo, err error) error +type ErrorFunc func(file string, err error) error // ItemStats collects some statistics about a particular file or directory. type ItemStats struct { @@ -157,7 +157,7 @@ func New(repo restic.Repository, fs fs.FS, opts Options) *Archiver { } // error calls arch.Error if it is set and the error is different from context.Canceled. -func (arch *Archiver) error(item string, fi os.FileInfo, err error) error { +func (arch *Archiver) error(item string, err error) error { if arch.Error == nil || err == nil { return err } @@ -166,7 +166,7 @@ func (arch *Archiver) error(item string, fi os.FileInfo, err error) error { return err } - errf := arch.Error(item, fi, err) + errf := arch.Error(item, err) if err != errf { debug.Log("item %v: error was filtered by handler, before: %q, after: %v", item, err, errf) } @@ -269,7 +269,7 @@ func (arch *Archiver) SaveDir(ctx context.Context, snPath string, fi os.FileInfo // return error early if possible if err != nil { - err = arch.error(pathname, fi, err) + err = arch.error(pathname, err) if err == nil { // ignore error continue @@ -294,9 +294,6 @@ func (arch *Archiver) SaveDir(ctx context.Context, snPath string, fi os.FileInfo type FutureNode struct { snPath, target string - // kept to call the error callback function - fi os.FileInfo - node *restic.Node stats ItemStats err error @@ -375,7 +372,7 @@ func (arch *Archiver) Save(ctx context.Context, snPath, target string, previous fi, err := arch.FS.Lstat(target) if err != nil { debug.Log("lstat() for %v returned error: %v", target, err) - err = arch.error(abstarget, fi, err) + err = arch.error(abstarget, err) if err != nil { return FutureNode{}, false, errors.Wrap(err, "Lstat") } @@ -412,7 +409,7 @@ func (arch *Archiver) Save(ctx context.Context, snPath, target string, previous debug.Log("%v hasn't changed, but contents are missing!", target) // There are contents missing - inform user! err := errors.Errorf("parts of %v not found in the repository index; storing the file again", target) - err = arch.error(abstarget, fi, err) + err = arch.error(abstarget, err) if err != nil { return FutureNode{}, false, err } @@ -423,7 +420,7 @@ func (arch *Archiver) Save(ctx context.Context, snPath, target string, previous file, err := arch.FS.OpenFile(target, fs.O_RDONLY|fs.O_NOFOLLOW, 0) if err != nil { debug.Log("Openfile() for %v returned error: %v", target, err) - err = arch.error(abstarget, fi, err) + err = arch.error(abstarget, err) if err != nil { return FutureNode{}, false, errors.Wrap(err, "Lstat") } @@ -434,7 +431,7 @@ func (arch *Archiver) Save(ctx context.Context, snPath, target string, previous if err != nil { debug.Log("stat() on opened file %v returned error: %v", target, err) _ = file.Close() - err = arch.error(abstarget, fi, err) + err = arch.error(abstarget, err) if err != nil { return FutureNode{}, false, errors.Wrap(err, "Lstat") } @@ -445,7 +442,7 @@ func (arch *Archiver) Save(ctx context.Context, snPath, target string, previous if !fs.IsRegularFile(fi) { err = errors.Errorf("file %v changed type, refusing to archive") _ = file.Close() - err = arch.error(abstarget, fi, err) + err = arch.error(abstarget, err) if err != nil { return FutureNode{}, false, err } @@ -467,7 +464,7 @@ func (arch *Archiver) Save(ctx context.Context, snPath, target string, previous start := time.Now() oldSubtree, err := arch.loadSubtree(ctx, previous) if err != nil { - err = arch.error(abstarget, fi, err) + err = arch.error(abstarget, err) } if err != nil { return FutureNode{}, false, err @@ -576,7 +573,7 @@ func (arch *Archiver) SaveTree(ctx context.Context, snPath string, atree *Tree, fn, excluded, err := arch.Save(ctx, join(snPath, name), subatree.Path, previous.Find(name)) if err != nil { - err = arch.error(subatree.Path, fn.fi, err) + err = arch.error(subatree.Path, err) if err == nil { // ignore error continue @@ -600,7 +597,7 @@ func (arch *Archiver) SaveTree(ctx context.Context, snPath string, atree *Tree, oldNode := previous.Find(name) oldSubtree, err := arch.loadSubtree(ctx, oldNode) if err != nil { - err = arch.error(join(snPath, name), nil, err) + err = arch.error(join(snPath, name), err) } if err != nil { return nil, err @@ -654,7 +651,7 @@ func (arch *Archiver) SaveTree(ctx context.Context, snPath string, atree *Tree, // return the error, or ignore it if fn.err != nil { - fn.err = arch.error(fn.target, fn.fi, fn.err) + fn.err = arch.error(fn.target, fn.err) if fn.err == nil { // ignore error continue @@ -762,7 +759,7 @@ func (arch *Archiver) loadParentTree(ctx context.Context, snapshotID restic.ID) tree, err := restic.LoadTree(ctx, arch.Repo, *sn.Tree) if err != nil { debug.Log("unable to load tree %v: %v", *sn.Tree, err) - _ = arch.error("/", nil, arch.wrapLoadTreeError(*sn.Tree, err)) + _ = arch.error("/", arch.wrapLoadTreeError(*sn.Tree, err)) return nil } return tree diff --git a/internal/archiver/archiver_test.go b/internal/archiver/archiver_test.go index 6367a19cb..a5ba9262c 100644 --- a/internal/archiver/archiver_test.go +++ b/internal/archiver/archiver_test.go @@ -47,7 +47,7 @@ func saveFile(t testing.TB, repo restic.Repository, filename string, filesystem arch := New(repo, filesystem, Options{}) arch.runWorkers(ctx, wg) - arch.Error = func(item string, fi os.FileInfo, err error) error { + arch.Error = func(item string, err error) error { t.Errorf("archiver error for %v: %v", item, err) return err } @@ -217,7 +217,7 @@ func TestArchiverSave(t *testing.T) { repo.StartPackUploader(ctx, wg) arch := New(repo, fs.Track{FS: fs.Local{}}, Options{}) - arch.Error = func(item string, fi os.FileInfo, err error) error { + arch.Error = func(item string, err error) error { t.Errorf("archiver error for %v: %v", item, err) return err } @@ -295,7 +295,7 @@ func TestArchiverSaveReaderFS(t *testing.T) { } arch := New(repo, readerFs, Options{}) - arch.Error = func(item string, fi os.FileInfo, err error) error { + arch.Error = func(item string, err error) error { t.Errorf("archiver error for %v: %v", item, err) return err } @@ -1723,7 +1723,7 @@ func TestArchiverParent(t *testing.T) { func TestArchiverErrorReporting(t *testing.T) { ignoreErrorForBasename := func(basename string) ErrorFunc { - return func(item string, fi os.FileInfo, err error) error { + return func(item string, err error) error { if filepath.Base(item) == "targetfile" { t.Logf("ignoring error for targetfile: %v", err) return nil @@ -2248,7 +2248,7 @@ func TestRacyFileSwap(t *testing.T) { repo.StartPackUploader(ctx, wg) arch := New(repo, fs.Track{FS: statfs}, Options{}) - arch.Error = func(item string, fi os.FileInfo, err error) error { + arch.Error = func(item string, err error) error { t.Logf("archiver error as expected for %v: %v", item, err) return err } diff --git a/internal/archiver/scanner.go b/internal/archiver/scanner.go index 5c8474259..6ce2a4700 100644 --- a/internal/archiver/scanner.go +++ b/internal/archiver/scanner.go @@ -27,7 +27,7 @@ func NewScanner(fs fs.FS) *Scanner { FS: fs, SelectByName: func(item string) bool { return true }, Select: func(item string, fi os.FileInfo) bool { return true }, - Error: func(item string, fi os.FileInfo, err error) error { return err }, + Error: func(item string, err error) error { return err }, Result: func(item string, s ScanStats) {}, } } @@ -111,7 +111,7 @@ func (s *Scanner) scan(ctx context.Context, stats ScanStats, target string) (Sca // get file information fi, err := s.FS.Lstat(target) if err != nil { - return stats, s.Error(target, fi, err) + return stats, s.Error(target, err) } // run remaining select functions that require file information @@ -126,7 +126,7 @@ func (s *Scanner) scan(ctx context.Context, stats ScanStats, target string) (Sca case fi.Mode().IsDir(): names, err := readdirnames(s.FS, target, fs.O_NOFOLLOW) if err != nil { - return stats, s.Error(target, fi, err) + return stats, s.Error(target, err) } sort.Strings(names) diff --git a/internal/archiver/scanner_test.go b/internal/archiver/scanner_test.go index 6c2d35d81..87d8c887d 100644 --- a/internal/archiver/scanner_test.go +++ b/internal/archiver/scanner_test.go @@ -133,7 +133,7 @@ func TestScannerError(t *testing.T) { src TestDir result ScanStats selFn SelectFunc - errFn func(t testing.TB, item string, fi os.FileInfo, err error) error + errFn func(t testing.TB, item string, err error) error resFn func(t testing.TB, item string, s ScanStats) prepare func(t testing.TB) }{ @@ -173,7 +173,7 @@ func TestScannerError(t *testing.T) { t.Fatal(err) } }, - errFn: func(t testing.TB, item string, fi os.FileInfo, err error) error { + errFn: func(t testing.TB, item string, err error) error { if item == filepath.FromSlash("work/subdir") { return nil } @@ -198,7 +198,7 @@ func TestScannerError(t *testing.T) { } } }, - errFn: func(t testing.TB, item string, fi os.FileInfo, err error) error { + errFn: func(t testing.TB, item string, err error) error { if item == "foo" { t.Logf("ignoring error for %v: %v", item, err) return nil @@ -257,13 +257,13 @@ func TestScannerError(t *testing.T) { } } if test.errFn != nil { - sc.Error = func(item string, fi os.FileInfo, err error) error { + sc.Error = func(item string, err error) error { p, relErr := filepath.Rel(cur, item) if relErr != nil { panic(relErr) } - return test.errFn(t, p, fi, err) + return test.errFn(t, p, err) } } diff --git a/internal/archiver/tree_saver.go b/internal/archiver/tree_saver.go index afa58be40..4ed033fac 100644 --- a/internal/archiver/tree_saver.go +++ b/internal/archiver/tree_saver.go @@ -114,7 +114,7 @@ func (s *TreeSaver) save(ctx context.Context, snPath string, node *restic.Node, // return the error if it wasn't ignored if fn.err != nil { debug.Log("err for %v: %v", fn.snPath, fn.err) - fn.err = s.errFn(fn.target, fn.fi, fn.err) + fn.err = s.errFn(fn.target, fn.err) if fn.err == nil { // ignore error continue diff --git a/internal/archiver/tree_saver_test.go b/internal/archiver/tree_saver_test.go index 8ffafcaad..e7314e8f8 100644 --- a/internal/archiver/tree_saver_test.go +++ b/internal/archiver/tree_saver_test.go @@ -3,7 +3,6 @@ package archiver import ( "context" "fmt" - "os" "runtime" "sync/atomic" "testing" @@ -23,7 +22,7 @@ func TestTreeSaver(t *testing.T) { return restic.NewRandomID(), ItemStats{TreeBlobs: 1, TreeSize: 123}, nil } - errFn := func(snPath string, fi os.FileInfo, err error) error { + errFn := func(snPath string, err error) error { return nil } @@ -83,7 +82,7 @@ func TestTreeSaverError(t *testing.T) { return restic.NewRandomID(), ItemStats{TreeBlobs: 1, TreeSize: 123}, nil } - errFn := func(snPath string, fi os.FileInfo, err error) error { + errFn := func(snPath string, err error) error { t.Logf("ignoring error %v\n", err) return nil } diff --git a/internal/ui/backup/json.go b/internal/ui/backup/json.go index 3e8ce5e94..1cbd0c197 100644 --- a/internal/ui/backup/json.go +++ b/internal/ui/backup/json.go @@ -3,7 +3,6 @@ package backup import ( "bytes" "encoding/json" - "os" "sort" "time" @@ -79,7 +78,7 @@ func (b *JSONProgress) Update(total, processed Counter, errors uint, currentFile // ScannerError is the error callback function for the scanner, it prints the // error in verbose mode and returns nil. -func (b *JSONProgress) ScannerError(item string, fi os.FileInfo, err error) error { +func (b *JSONProgress) ScannerError(item string, err error) error { b.error(errorUpdate{ MessageType: "error", Error: err, @@ -90,7 +89,7 @@ func (b *JSONProgress) ScannerError(item string, fi os.FileInfo, err error) erro } // Error is the error callback function for the archiver, it prints the error and returns nil. -func (b *JSONProgress) Error(item string, fi os.FileInfo, err error) error { +func (b *JSONProgress) Error(item string, err error) error { b.error(errorUpdate{ MessageType: "error", Error: err, diff --git a/internal/ui/backup/progress.go b/internal/ui/backup/progress.go index 781ac289b..a4b641fe9 100644 --- a/internal/ui/backup/progress.go +++ b/internal/ui/backup/progress.go @@ -3,7 +3,6 @@ package backup import ( "context" "io" - "os" "sync" "time" @@ -14,8 +13,8 @@ import ( type ProgressPrinter interface { Update(total, processed Counter, errors uint, currentFiles map[string]struct{}, start time.Time, secs uint64) - Error(item string, fi os.FileInfo, err error) error - ScannerError(item string, fi os.FileInfo, err error) error + Error(item string, err error) error + ScannerError(item string, err error) error CompleteItem(messageType string, item string, previous, current *restic.Node, s archiver.ItemStats, d time.Duration) ReportTotal(item string, start time.Time, s archiver.ScanStats) Finish(snapshotID restic.ID, start time.Time, summary *Summary, dryRun bool) @@ -44,11 +43,11 @@ type ProgressReporter interface { CompleteItem(item string, previous, current *restic.Node, s archiver.ItemStats, d time.Duration) StartFile(filename string) CompleteBlob(filename string, bytes uint64) - ScannerError(item string, fi os.FileInfo, err error) error + ScannerError(item string, err error) error ReportTotal(item string, s archiver.ScanStats) SetMinUpdatePause(d time.Duration) Run(ctx context.Context) error - Error(item string, fi os.FileInfo, err error) error + Error(item string, err error) error Finish(snapshotID restic.ID) } @@ -173,13 +172,13 @@ func (p *Progress) Run(ctx context.Context) error { // ScannerError is the error callback function for the scanner, it prints the // error in verbose mode and returns nil. -func (p *Progress) ScannerError(item string, fi os.FileInfo, err error) error { - return p.printer.ScannerError(item, fi, err) +func (p *Progress) ScannerError(item string, err error) error { + return p.printer.ScannerError(item, err) } // Error is the error callback function for the archiver, it prints the error and returns nil. -func (p *Progress) Error(item string, fi os.FileInfo, err error) error { - cbErr := p.printer.Error(item, fi, err) +func (p *Progress) Error(item string, err error) error { + cbErr := p.printer.Error(item, err) select { case p.errCh <- struct{}{}: diff --git a/internal/ui/backup/text.go b/internal/ui/backup/text.go index 801def0db..03013bec1 100644 --- a/internal/ui/backup/text.go +++ b/internal/ui/backup/text.go @@ -2,7 +2,6 @@ package backup import ( "fmt" - "os" "sort" "time" @@ -75,13 +74,13 @@ func (b *TextProgress) Update(total, processed Counter, errors uint, currentFile // ScannerError is the error callback function for the scanner, it prints the // error in verbose mode and returns nil. -func (b *TextProgress) ScannerError(item string, fi os.FileInfo, err error) error { +func (b *TextProgress) ScannerError(item string, err error) error { b.V("scan: %v\n", err) return nil } // Error is the error callback function for the archiver, it prints the error and returns nil. -func (b *TextProgress) Error(item string, fi os.FileInfo, err error) error { +func (b *TextProgress) Error(item string, err error) error { b.E("error: %v\n", err) return nil } From c206a101a3ea6a701ad2089c12837c7e955da25b Mon Sep 17 00:00:00 2001 From: Michael Eischer Date: Sun, 29 May 2022 11:57:10 +0200 Subject: [PATCH 4/7] archiver: unify FutureTree/File into futureNode There is no real difference between the FutureTree and FutureFile structs. However, differentiating both increases the size of the FutureNode struct. The FutureNode struct is now only 16 bytes large on 64bit platforms. That way is has a very low overhead if the corresponding file/directory was not processed yet. There is a special case for nodes that were reused from the parent snapshot, as a go channel seems to have 96 bytes overhead which would result in a memory usage regression. --- internal/archiver/archiver.go | 121 +++++++++++++++------------ internal/archiver/archiver_test.go | 52 ++++++------ internal/archiver/file_saver.go | 86 +++++++------------ internal/archiver/file_saver_test.go | 10 +-- internal/archiver/tree_saver.go | 87 +++++++------------ internal/archiver/tree_saver_test.go | 12 +-- 6 files changed, 163 insertions(+), 205 deletions(-) diff --git a/internal/archiver/archiver.go b/internal/archiver/archiver.go index dbf1faa21..247d922de 100644 --- a/internal/archiver/archiver.go +++ b/internal/archiver/archiver.go @@ -239,17 +239,17 @@ func (arch *Archiver) wrapLoadTreeError(id restic.ID, err error) error { // SaveDir stores a directory in the repo and returns the node. snPath is the // path within the current snapshot. -func (arch *Archiver) SaveDir(ctx context.Context, snPath string, fi os.FileInfo, dir string, previous *restic.Tree, complete CompleteFunc) (d FutureTree, err error) { +func (arch *Archiver) SaveDir(ctx context.Context, snPath string, dir string, fi os.FileInfo, previous *restic.Tree, complete CompleteFunc) (d FutureNode, err error) { debug.Log("%v %v", snPath, dir) treeNode, err := arch.nodeFromFileInfo(dir, fi) if err != nil { - return FutureTree{}, err + return FutureNode{}, err } names, err := readdirnames(arch.FS, dir, fs.O_NOFOLLOW) if err != nil { - return FutureTree{}, err + return FutureNode{}, err } sort.Strings(names) @@ -259,7 +259,7 @@ func (arch *Archiver) SaveDir(ctx context.Context, snPath string, fi os.FileInfo // test if context has been cancelled if ctx.Err() != nil { debug.Log("context has been cancelled, aborting") - return FutureTree{}, ctx.Err() + return FutureNode{}, ctx.Err() } pathname := arch.FS.Join(dir, name) @@ -275,7 +275,7 @@ func (arch *Archiver) SaveDir(ctx context.Context, snPath string, fi os.FileInfo continue } - return FutureTree{}, err + return FutureNode{}, err } if excluded { @@ -285,50 +285,58 @@ func (arch *Archiver) SaveDir(ctx context.Context, snPath string, fi os.FileInfo nodes = append(nodes, fn) } - ft := arch.treeSaver.Save(ctx, snPath, treeNode, nodes, complete) + fn := arch.treeSaver.Save(ctx, snPath, dir, treeNode, nodes, complete) - return ft, nil + return fn, nil } -// FutureNode holds a reference to a node, FutureFile, or FutureTree. +// FutureNode holds a reference to a channel that returns a FutureNodeResult +// or a reference to an already existing result. If the result is available +// immediatelly, then storing a reference directly requires less memory than +// using the indirection via a channel. type FutureNode struct { + ch <-chan futureNodeResult + res *futureNodeResult +} + +type futureNodeResult struct { snPath, target string node *restic.Node stats ItemStats err error - - isFile bool - file FutureFile - isTree bool - tree FutureTree } -func (fn *FutureNode) wait(ctx context.Context) { - switch { - case fn.isFile: - // wait for and collect the data for the file - fn.file.Wait(ctx) - fn.node = fn.file.Node() - fn.err = fn.file.Err() - fn.stats = fn.file.Stats() +func newFutureNode() (FutureNode, chan<- futureNodeResult) { + ch := make(chan futureNodeResult, 1) + return FutureNode{ch: ch}, ch +} - // ensure the other stuff can be garbage-collected - fn.file = FutureFile{} - fn.isFile = false - - case fn.isTree: - // wait for and collect the data for the dir - fn.tree.Wait(ctx) - fn.node = fn.tree.Node() - fn.stats = fn.tree.Stats() - - // ensure the other stuff can be garbage-collected - fn.tree = FutureTree{} - fn.isTree = false +func newFutureNodeWithResult(res futureNodeResult) FutureNode { + return FutureNode{ + res: &res, } } +func (fn *FutureNode) take(ctx context.Context) futureNodeResult { + if fn.res != nil { + res := fn.res + // free result + fn.res = nil + return *res + } + select { + case res, ok := <-fn.ch: + if ok { + // free channel + fn.ch = nil + return res + } + case <-ctx.Done(): + } + return futureNodeResult{} +} + // allBlobsPresent checks if all blobs (contents) of the given node are // present in the index. func (arch *Archiver) allBlobsPresent(previous *restic.Node) bool { @@ -351,11 +359,6 @@ func (arch *Archiver) allBlobsPresent(previous *restic.Node) bool { func (arch *Archiver) Save(ctx context.Context, snPath, target string, previous *restic.Node) (fn FutureNode, excluded bool, err error) { start := time.Now() - fn = FutureNode{ - snPath: snPath, - target: target, - } - debug.Log("%v target %q, previous %v", snPath, target, previous) abstarget, err := arch.FS.Abs(target) if err != nil { @@ -395,14 +398,19 @@ func (arch *Archiver) Save(ctx context.Context, snPath, target string, previous debug.Log("%v hasn't changed, using old list of blobs", target) arch.CompleteItem(snPath, previous, previous, ItemStats{}, time.Since(start)) arch.CompleteBlob(snPath, previous.Size) - fn.node, err = arch.nodeFromFileInfo(target, fi) + node, err := arch.nodeFromFileInfo(target, fi) if err != nil { return FutureNode{}, false, err } // copy list of blobs - fn.node.Content = previous.Content + node.Content = previous.Content + fn = newFutureNodeWithResult(futureNodeResult{ + snPath: snPath, + target: target, + node: node, + }) return fn, false, nil } @@ -449,9 +457,8 @@ func (arch *Archiver) Save(ctx context.Context, snPath, target string, previous return FutureNode{}, true, nil } - fn.isFile = true // Save will close the file, we don't need to do that - fn.file = arch.fileSaver.Save(ctx, snPath, file, fi, func() { + fn = arch.fileSaver.Save(ctx, snPath, target, file, fi, func() { arch.StartFile(snPath) }, func(node *restic.Node, stats ItemStats) { arch.CompleteItem(snPath, previous, node, stats, time.Since(start)) @@ -470,8 +477,7 @@ func (arch *Archiver) Save(ctx context.Context, snPath, target string, previous return FutureNode{}, false, err } - fn.isTree = true - fn.tree, err = arch.SaveDir(ctx, snPath, fi, target, oldSubtree, + fn, err = arch.SaveDir(ctx, snPath, target, fi, oldSubtree, func(node *restic.Node, stats ItemStats) { arch.CompleteItem(snItem, previous, node, stats, time.Since(start)) }) @@ -487,10 +493,15 @@ func (arch *Archiver) Save(ctx context.Context, snPath, target string, previous default: debug.Log(" %v other", target) - fn.node, err = arch.nodeFromFileInfo(target, fi) + node, err := arch.nodeFromFileInfo(target, fi) if err != nil { return FutureNode{}, false, err } + fn = newFutureNodeWithResult(futureNodeResult{ + snPath: snPath, + target: target, + node: node, + }) } debug.Log("return after %.3f", time.Since(start).Seconds()) @@ -647,28 +658,28 @@ func (arch *Archiver) SaveTree(ctx context.Context, snPath string, atree *Tree, // process all futures for name, fn := range futureNodes { - fn.wait(ctx) + fnr := fn.take(ctx) // return the error, or ignore it - if fn.err != nil { - fn.err = arch.error(fn.target, fn.err) - if fn.err == nil { + if fnr.err != nil { + fnr.err = arch.error(fnr.target, fnr.err) + if fnr.err == nil { // ignore error continue } - return nil, fn.err + return nil, fnr.err } // when the error is ignored, the node could not be saved, so ignore it - if fn.node == nil { - debug.Log("%v excluded: %v", fn.snPath, fn.target) + if fnr.node == nil { + debug.Log("%v excluded: %v", fnr.snPath, fnr.target) continue } - fn.node.Name = name + fnr.node.Name = name - err := tree.Insert(fn.node) + err := tree.Insert(fnr.node) if err != nil { return nil, err } diff --git a/internal/archiver/archiver_test.go b/internal/archiver/archiver_test.go index a5ba9262c..a6485234f 100644 --- a/internal/archiver/archiver_test.go +++ b/internal/archiver/archiver_test.go @@ -80,11 +80,11 @@ func saveFile(t testing.TB, repo restic.Repository, filename string, filesystem t.Fatal(err) } - res := arch.fileSaver.Save(ctx, "/", file, fi, start, complete) + res := arch.fileSaver.Save(ctx, "/", filename, file, fi, start, complete) - res.Wait(ctx) - if res.Err() != nil { - t.Fatal(res.Err()) + fnr := res.take(ctx) + if fnr.err != nil { + t.Fatal(fnr.err) } arch.stopWorkers() @@ -109,15 +109,15 @@ func saveFile(t testing.TB, repo restic.Repository, filename string, filesystem t.Errorf("no node returned for complete callback") } - if completeCallbackNode != nil && !res.Node().Equals(*completeCallbackNode) { + if completeCallbackNode != nil && !fnr.node.Equals(*completeCallbackNode) { t.Errorf("different node returned for complete callback") } - if completeCallbackStats != res.Stats() { - t.Errorf("different stats return for complete callback, want:\n %v\ngot:\n %v", res.Stats(), completeCallbackStats) + if completeCallbackStats != fnr.stats { + t.Errorf("different stats return for complete callback, want:\n %v\ngot:\n %v", fnr.stats, completeCallbackStats) } - return res.Node(), res.Stats() + return fnr.node, fnr.stats } func TestArchiverSaveFile(t *testing.T) { @@ -232,16 +232,16 @@ func TestArchiverSave(t *testing.T) { t.Errorf("Save() excluded the node, that's unexpected") } - node.wait(ctx) - if node.err != nil { - t.Fatal(node.err) + fnr := node.take(ctx) + if fnr.err != nil { + t.Fatal(fnr.err) } - if node.node == nil { + if fnr.node == nil { t.Fatalf("returned node is nil") } - stats := node.stats + stats := fnr.stats arch.stopWorkers() err = repo.Flush(ctx) @@ -249,7 +249,7 @@ func TestArchiverSave(t *testing.T) { t.Fatal(err) } - TestEnsureFileContent(ctx, t, repo, "file", node.node, testfile) + TestEnsureFileContent(ctx, t, repo, "file", fnr.node, testfile) if stats.DataSize != uint64(len(testfile.Content)) { t.Errorf("wrong stats returned in DataSize, want %d, got %d", len(testfile.Content), stats.DataSize) } @@ -311,16 +311,16 @@ func TestArchiverSaveReaderFS(t *testing.T) { t.Errorf("Save() excluded the node, that's unexpected") } - node.wait(ctx) - if node.err != nil { - t.Fatal(node.err) + fnr := node.take(ctx) + if fnr.err != nil { + t.Fatal(fnr.err) } - if node.node == nil { + if fnr.node == nil { t.Fatalf("returned node is nil") } - stats := node.stats + stats := fnr.stats arch.stopWorkers() err = repo.Flush(ctx) @@ -328,7 +328,7 @@ func TestArchiverSaveReaderFS(t *testing.T) { t.Fatal(err) } - TestEnsureFileContent(ctx, t, repo, "file", node.node, TestFile{Content: test.Data}) + TestEnsureFileContent(ctx, t, repo, "file", fnr.node, TestFile{Content: test.Data}) if stats.DataSize != uint64(len(test.Data)) { t.Errorf("wrong stats returned in DataSize, want %d, got %d", len(test.Data), stats.DataSize) } @@ -851,13 +851,13 @@ func TestArchiverSaveDir(t *testing.T) { t.Fatal(err) } - ft, err := arch.SaveDir(ctx, "/", fi, test.target, nil, nil) + ft, err := arch.SaveDir(ctx, "/", test.target, fi, nil, nil) if err != nil { t.Fatal(err) } - ft.Wait(ctx) - node, stats := ft.Node(), ft.Stats() + fnr := ft.take(ctx) + node, stats := fnr.node, fnr.stats t.Logf("stats: %v", stats) if stats.DataSize != 0 { @@ -928,13 +928,13 @@ func TestArchiverSaveDirIncremental(t *testing.T) { t.Fatal(err) } - ft, err := arch.SaveDir(ctx, "/", fi, tempdir, nil, nil) + ft, err := arch.SaveDir(ctx, "/", tempdir, fi, nil, nil) if err != nil { t.Fatal(err) } - ft.Wait(ctx) - node, stats := ft.Node(), ft.Stats() + fnr := ft.take(ctx) + node, stats := fnr.node, fnr.stats if err != nil { t.Fatal(err) diff --git a/internal/archiver/file_saver.go b/internal/archiver/file_saver.go index 0ec871e8b..1e6eea979 100644 --- a/internal/archiver/file_saver.go +++ b/internal/archiver/file_saver.go @@ -13,41 +13,6 @@ import ( "golang.org/x/sync/errgroup" ) -// FutureFile is returned by Save and will return the data once it -// has been processed. -type FutureFile struct { - ch <-chan saveFileResponse - res saveFileResponse -} - -// Wait blocks until the result of the save operation is received or ctx is -// cancelled. -func (s *FutureFile) Wait(ctx context.Context) { - select { - case res, ok := <-s.ch: - if ok { - s.res = res - } - case <-ctx.Done(): - return - } -} - -// Node returns the node once it is available. -func (s *FutureFile) Node() *restic.Node { - return s.res.node -} - -// Stats returns the stats for the file once they are available. -func (s *FutureFile) Stats() ItemStats { - return s.res.stats -} - -// Err returns the error in case an error occurred. -func (s *FutureFile) Err() error { - return s.res.err -} - // SaveBlobFn saves a blob to a repo. type SaveBlobFn func(context.Context, restic.BlobType, *Buffer) FutureBlob @@ -102,10 +67,11 @@ type CompleteFunc func(*restic.Node, ItemStats) // Save stores the file f and returns the data once it has been completed. The // file is closed by Save. -func (s *FileSaver) Save(ctx context.Context, snPath string, file fs.File, fi os.FileInfo, start func(), complete CompleteFunc) FutureFile { - ch := make(chan saveFileResponse, 1) +func (s *FileSaver) Save(ctx context.Context, snPath string, target string, file fs.File, fi os.FileInfo, start func(), complete CompleteFunc) FutureNode { + fn, ch := newFutureNode() job := saveFileJob{ snPath: snPath, + target: target, file: file, fi: fi, start: start, @@ -121,41 +87,42 @@ func (s *FileSaver) Save(ctx context.Context, snPath string, file fs.File, fi os close(ch) } - return FutureFile{ch: ch} + return fn } type saveFileJob struct { snPath string + target string file fs.File fi os.FileInfo - ch chan<- saveFileResponse + ch chan<- futureNodeResult complete CompleteFunc start func() } -type saveFileResponse struct { - node *restic.Node - stats ItemStats - err error -} - // saveFile stores the file f in the repo, then closes it. -func (s *FileSaver) saveFile(ctx context.Context, chnker *chunker.Chunker, snPath string, f fs.File, fi os.FileInfo, start func()) saveFileResponse { +func (s *FileSaver) saveFile(ctx context.Context, chnker *chunker.Chunker, snPath string, target string, f fs.File, fi os.FileInfo, start func()) futureNodeResult { start() stats := ItemStats{} + fnr := futureNodeResult{ + snPath: snPath, + target: target, + } debug.Log("%v", snPath) node, err := s.NodeFromFileInfo(f.Name(), fi) if err != nil { _ = f.Close() - return saveFileResponse{err: err} + fnr.err = err + return fnr } if node.Type != "file" { _ = f.Close() - return saveFileResponse{err: errors.Errorf("node type %q is wrong", node.Type)} + fnr.err = errors.Errorf("node type %q is wrong", node.Type) + return fnr } // reuse the chunker @@ -179,13 +146,15 @@ func (s *FileSaver) saveFile(ctx context.Context, chnker *chunker.Chunker, snPat if err != nil { _ = f.Close() - return saveFileResponse{err: err} + fnr.err = err + return fnr } // test if the context has been cancelled, return the error if ctx.Err() != nil { _ = f.Close() - return saveFileResponse{err: ctx.Err()} + fnr.err = ctx.Err() + return fnr } res := s.saveBlob(ctx, restic.DataBlob, buf) @@ -194,7 +163,8 @@ func (s *FileSaver) saveFile(ctx context.Context, chnker *chunker.Chunker, snPat // test if the context has been cancelled, return the error if ctx.Err() != nil { _ = f.Close() - return saveFileResponse{err: ctx.Err()} + fnr.err = ctx.Err() + return fnr } s.CompleteBlob(f.Name(), uint64(len(chunk.Data))) @@ -202,7 +172,8 @@ func (s *FileSaver) saveFile(ctx context.Context, chnker *chunker.Chunker, snPat err = f.Close() if err != nil { - return saveFileResponse{err: err} + fnr.err = err + return fnr } for _, res := range results { @@ -217,11 +188,9 @@ func (s *FileSaver) saveFile(ctx context.Context, chnker *chunker.Chunker, snPat } node.Size = size - - return saveFileResponse{ - node: node, - stats: stats, - } + fnr.node = node + fnr.stats = stats + return fnr } func (s *FileSaver) worker(ctx context.Context, jobs <-chan saveFileJob) { @@ -239,7 +208,8 @@ func (s *FileSaver) worker(ctx context.Context, jobs <-chan saveFileJob) { return } } - res := s.saveFile(ctx, chnker, job.snPath, job.file, job.fi, job.start) + + res := s.saveFile(ctx, chnker, job.snPath, job.target, job.file, job.fi, job.start) if job.complete != nil { job.complete(res.node, res.stats) } diff --git a/internal/archiver/file_saver_test.go b/internal/archiver/file_saver_test.go index 497882fcb..0bdb8ad50 100644 --- a/internal/archiver/file_saver_test.go +++ b/internal/archiver/file_saver_test.go @@ -64,7 +64,7 @@ func TestFileSaver(t *testing.T) { testFs := fs.Local{} s, ctx, wg := startFileSaver(ctx, t) - var results []FutureFile + var results []FutureNode for _, filename := range files { f, err := testFs.Open(filename) @@ -77,14 +77,14 @@ func TestFileSaver(t *testing.T) { t.Fatal(err) } - ff := s.Save(ctx, filename, f, fi, startFn, completeFn) + ff := s.Save(ctx, filename, filename, f, fi, startFn, completeFn) results = append(results, ff) } for _, file := range results { - file.Wait(ctx) - if file.Err() != nil { - t.Errorf("unable to save file: %v", file.Err()) + fnr := file.take(ctx) + if fnr.err != nil { + t.Errorf("unable to save file: %v", fnr.err) } } diff --git a/internal/archiver/tree_saver.go b/internal/archiver/tree_saver.go index 4ed033fac..221df85e1 100644 --- a/internal/archiver/tree_saver.go +++ b/internal/archiver/tree_saver.go @@ -8,35 +8,6 @@ import ( "golang.org/x/sync/errgroup" ) -// FutureTree is returned by Save and will return the data once it -// has been processed. -type FutureTree struct { - ch <-chan saveTreeResponse - res saveTreeResponse -} - -// Wait blocks until the data has been received or ctx is cancelled. -func (s *FutureTree) Wait(ctx context.Context) { - select { - case <-ctx.Done(): - return - case res, ok := <-s.ch: - if ok { - s.res = res - } - } -} - -// Node returns the node. -func (s *FutureTree) Node() *restic.Node { - return s.res.node -} - -// Stats returns the stats for the file. -func (s *FutureTree) Stats() ItemStats { - return s.res.stats -} - // TreeSaver concurrently saves incoming trees to the repo. type TreeSaver struct { saveTree func(context.Context, *restic.Tree) (restic.ID, ItemStats, error) @@ -70,10 +41,11 @@ func (s *TreeSaver) TriggerShutdown() { } // Save stores the dir d and returns the data once it has been completed. -func (s *TreeSaver) Save(ctx context.Context, snPath string, node *restic.Node, nodes []FutureNode, complete CompleteFunc) FutureTree { - ch := make(chan saveTreeResponse, 1) +func (s *TreeSaver) Save(ctx context.Context, snPath string, target string, node *restic.Node, nodes []FutureNode, complete CompleteFunc) FutureNode { + fn, ch := newFutureNode() job := saveTreeJob{ snPath: snPath, + target: target, node: node, nodes: nodes, ch: ch, @@ -86,51 +58,53 @@ func (s *TreeSaver) Save(ctx context.Context, snPath string, node *restic.Node, close(ch) } - return FutureTree{ch: ch} + return fn } type saveTreeJob struct { snPath string - nodes []FutureNode + target string node *restic.Node - ch chan<- saveTreeResponse + nodes []FutureNode + ch chan<- futureNodeResult complete CompleteFunc } -type saveTreeResponse struct { - node *restic.Node - stats ItemStats -} - // save stores the nodes as a tree in the repo. -func (s *TreeSaver) save(ctx context.Context, snPath string, node *restic.Node, nodes []FutureNode) (*restic.Node, ItemStats, error) { +func (s *TreeSaver) save(ctx context.Context, job *saveTreeJob) (*restic.Node, ItemStats, error) { var stats ItemStats + node := job.node + nodes := job.nodes + // allow GC of nodes array once the loop is finished + job.nodes = nil tree := restic.NewTree(len(nodes)) - for _, fn := range nodes { - fn.wait(ctx) + for i, fn := range nodes { + // fn is a copy, so clear the original value explicitly + nodes[i] = FutureNode{} + fnr := fn.take(ctx) // return the error if it wasn't ignored - if fn.err != nil { - debug.Log("err for %v: %v", fn.snPath, fn.err) - fn.err = s.errFn(fn.target, fn.err) - if fn.err == nil { + if fnr.err != nil { + debug.Log("err for %v: %v", fnr.snPath, fnr.err) + fnr.err = s.errFn(fnr.target, fnr.err) + if fnr.err == nil { // ignore error continue } - return nil, stats, fn.err + return nil, stats, fnr.err } // when the error is ignored, the node could not be saved, so ignore it - if fn.node == nil { - debug.Log("%v excluded: %v", fn.snPath, fn.target) + if fnr.node == nil { + debug.Log("%v excluded: %v", fnr.snPath, fnr.target) continue } - debug.Log("insert %v", fn.node.Name) - err := tree.Insert(fn.node) + debug.Log("insert %v", fnr.node.Name) + err := tree.Insert(fnr.node) if err != nil { return nil, stats, err } @@ -158,7 +132,8 @@ func (s *TreeSaver) worker(ctx context.Context, jobs <-chan saveTreeJob) error { return nil } } - node, stats, err := s.save(ctx, job.snPath, job.node, job.nodes) + + node, stats, err := s.save(ctx, &job) if err != nil { debug.Log("error saving tree blob: %v", err) close(job.ch) @@ -168,9 +143,11 @@ func (s *TreeSaver) worker(ctx context.Context, jobs <-chan saveTreeJob) error { if job.complete != nil { job.complete(node, stats) } - job.ch <- saveTreeResponse{ - node: node, - stats: stats, + job.ch <- futureNodeResult{ + snPath: job.snPath, + target: job.target, + node: node, + stats: stats, } close(job.ch) } diff --git a/internal/archiver/tree_saver_test.go b/internal/archiver/tree_saver_test.go index e7314e8f8..7a152ff0c 100644 --- a/internal/archiver/tree_saver_test.go +++ b/internal/archiver/tree_saver_test.go @@ -28,19 +28,19 @@ func TestTreeSaver(t *testing.T) { b := NewTreeSaver(ctx, wg, uint(runtime.NumCPU()), saveFn, errFn) - var results []FutureTree + var results []FutureNode for i := 0; i < 20; i++ { node := &restic.Node{ Name: fmt.Sprintf("file-%d", i), } - fb := b.Save(ctx, "/", node, nil, nil) + fb := b.Save(ctx, "/", node.Name, node, nil, nil) results = append(results, fb) } for _, tree := range results { - tree.Wait(ctx) + tree.take(ctx) } b.TriggerShutdown() @@ -89,19 +89,19 @@ func TestTreeSaverError(t *testing.T) { b := NewTreeSaver(ctx, wg, uint(runtime.NumCPU()), saveFn, errFn) - var results []FutureTree + var results []FutureNode for i := 0; i < test.trees; i++ { node := &restic.Node{ Name: fmt.Sprintf("file-%d", i), } - fb := b.Save(ctx, "/", node, nil, nil) + fb := b.Save(ctx, "/", node.Name, node, nil, nil) results = append(results, fb) } for _, tree := range results { - tree.Wait(ctx) + tree.take(ctx) } b.TriggerShutdown() From b817681a11b781cd04cc4c14efad87990ee8517d Mon Sep 17 00:00:00 2001 From: Michael Eischer Date: Sat, 21 May 2022 13:33:08 +0200 Subject: [PATCH 5/7] archiver: Incrementally serialize tree nodes That way it is not necessary to keep both the Nodes forming a Tree and the serialized JSON version in memory. --- internal/archiver/archiver.go | 23 +++++++------ internal/archiver/tree_saver.go | 10 +++--- internal/archiver/tree_saver_test.go | 4 +-- internal/restic/tree.go | 50 ++++++++++++++++++++++++++++ internal/restic/tree_test.go | 31 +++++++++++++++++ 5 files changed, 101 insertions(+), 17 deletions(-) diff --git a/internal/archiver/archiver.go b/internal/archiver/archiver.go index 247d922de..3dae576d9 100644 --- a/internal/archiver/archiver.go +++ b/internal/archiver/archiver.go @@ -2,7 +2,6 @@ package archiver import ( "context" - "encoding/json" "os" "path" "runtime" @@ -175,17 +174,13 @@ func (arch *Archiver) error(item string, err error) error { // saveTree stores a tree in the repo. It checks the index and the known blobs // before saving anything. -func (arch *Archiver) saveTree(ctx context.Context, t *restic.Tree) (restic.ID, ItemStats, error) { +func (arch *Archiver) saveTree(ctx context.Context, t *restic.TreeJSONBuilder) (restic.ID, ItemStats, error) { var s ItemStats - buf, err := json.Marshal(t) + buf, err := t.Finalize() if err != nil { - return restic.ID{}, s, errors.Wrap(err, "MarshalJSON") + return restic.ID{}, s, err } - // append a newline so that the data is always consistent (json.Encoder - // adds a newline after each object) - buf = append(buf, '\n') - b := &Buffer{Data: buf} res := arch.blobSaver.Save(ctx, restic.TreeBlob, b) @@ -620,7 +615,11 @@ func (arch *Archiver) SaveTree(ctx context.Context, snPath string, atree *Tree, return nil, err } - id, nodeStats, err := arch.saveTree(ctx, subtree) + tb, err := restic.TreeToBuilder(subtree) + if err != nil { + return nil, err + } + id, nodeStats, err := arch.saveTree(ctx, tb) if err != nil { return nil, err } @@ -834,7 +833,11 @@ func (arch *Archiver) Snapshot(ctx context.Context, targets []string, opts Snaps return errors.New("snapshot is empty") } - rootTreeID, stats, err = arch.saveTree(wgCtx, tree) + tb, err := restic.TreeToBuilder(tree) + if err != nil { + return err + } + rootTreeID, stats, err = arch.saveTree(wgCtx, tb) arch.stopWorkers() return err }) diff --git a/internal/archiver/tree_saver.go b/internal/archiver/tree_saver.go index 221df85e1..5aab09b94 100644 --- a/internal/archiver/tree_saver.go +++ b/internal/archiver/tree_saver.go @@ -10,7 +10,7 @@ import ( // TreeSaver concurrently saves incoming trees to the repo. type TreeSaver struct { - saveTree func(context.Context, *restic.Tree) (restic.ID, ItemStats, error) + saveTree func(context.Context, *restic.TreeJSONBuilder) (restic.ID, ItemStats, error) errFn ErrorFunc ch chan<- saveTreeJob @@ -18,7 +18,7 @@ type TreeSaver struct { // NewTreeSaver returns a new tree saver. A worker pool with treeWorkers is // started, it is stopped when ctx is cancelled. -func NewTreeSaver(ctx context.Context, wg *errgroup.Group, treeWorkers uint, saveTree func(context.Context, *restic.Tree) (restic.ID, ItemStats, error), errFn ErrorFunc) *TreeSaver { +func NewTreeSaver(ctx context.Context, wg *errgroup.Group, treeWorkers uint, saveTree func(context.Context, *restic.TreeJSONBuilder) (restic.ID, ItemStats, error), errFn ErrorFunc) *TreeSaver { ch := make(chan saveTreeJob) s := &TreeSaver{ @@ -78,7 +78,7 @@ func (s *TreeSaver) save(ctx context.Context, job *saveTreeJob) (*restic.Node, I // allow GC of nodes array once the loop is finished job.nodes = nil - tree := restic.NewTree(len(nodes)) + builder := restic.NewTreeJSONBuilder() for i, fn := range nodes { // fn is a copy, so clear the original value explicitly @@ -104,13 +104,13 @@ func (s *TreeSaver) save(ctx context.Context, job *saveTreeJob) (*restic.Node, I } debug.Log("insert %v", fnr.node.Name) - err := tree.Insert(fnr.node) + err := builder.AddNode(fnr.node) if err != nil { return nil, stats, err } } - id, treeStats, err := s.saveTree(ctx, tree) + id, treeStats, err := s.saveTree(ctx, builder) stats.Add(treeStats) if err != nil { return nil, stats, err diff --git a/internal/archiver/tree_saver_test.go b/internal/archiver/tree_saver_test.go index 7a152ff0c..36e585ae1 100644 --- a/internal/archiver/tree_saver_test.go +++ b/internal/archiver/tree_saver_test.go @@ -18,7 +18,7 @@ func TestTreeSaver(t *testing.T) { wg, ctx := errgroup.WithContext(ctx) - saveFn := func(context.Context, *restic.Tree) (restic.ID, ItemStats, error) { + saveFn := func(context.Context, *restic.TreeJSONBuilder) (restic.ID, ItemStats, error) { return restic.NewRandomID(), ItemStats{TreeBlobs: 1, TreeSize: 123}, nil } @@ -73,7 +73,7 @@ func TestTreeSaverError(t *testing.T) { wg, ctx := errgroup.WithContext(ctx) var num int32 - saveFn := func(context.Context, *restic.Tree) (restic.ID, ItemStats, error) { + saveFn := func(context.Context, *restic.TreeJSONBuilder) (restic.ID, ItemStats, error) { val := atomic.AddInt32(&num, 1) if val == test.failAt { t.Logf("sending error for request %v\n", test.failAt) diff --git a/internal/restic/tree.go b/internal/restic/tree.go index 33d1ec577..d1264074c 100644 --- a/internal/restic/tree.go +++ b/internal/restic/tree.go @@ -1,6 +1,7 @@ package restic import ( + "bytes" "context" "encoding/json" "fmt" @@ -143,3 +144,52 @@ func SaveTree(ctx context.Context, r BlobSaver, t *Tree) (ID, error) { id, _, _, err := r.SaveBlob(ctx, TreeBlob, buf, ID{}, false) return id, err } + +type TreeJSONBuilder struct { + buf bytes.Buffer + lastName string +} + +func NewTreeJSONBuilder() *TreeJSONBuilder { + tb := &TreeJSONBuilder{} + _, _ = tb.buf.WriteString(`{"nodes":[`) + return tb +} + +func (builder *TreeJSONBuilder) AddNode(node *Node) error { + if node.Name <= builder.lastName { + return errors.Errorf("nodes are not ordered got %q, last %q", node.Name, builder.lastName) + } + if builder.lastName != "" { + _ = builder.buf.WriteByte(',') + } + builder.lastName = node.Name + + val, err := json.Marshal(node) + if err != nil { + return err + } + _, _ = builder.buf.Write(val) + return nil +} + +func (builder *TreeJSONBuilder) Finalize() ([]byte, error) { + // append a newline so that the data is always consistent (json.Encoder + // adds a newline after each object) + _, _ = builder.buf.WriteString("]}\n") + buf := builder.buf.Bytes() + // drop reference to buffer + builder.buf = bytes.Buffer{} + return buf, nil +} + +func TreeToBuilder(t *Tree) (*TreeJSONBuilder, error) { + builder := NewTreeJSONBuilder() + for _, node := range t.Nodes { + err := builder.AddNode(node) + if err != nil { + return nil, err + } + } + return builder, nil +} diff --git a/internal/restic/tree_test.go b/internal/restic/tree_test.go index 3ed3e7938..811f0c6c6 100644 --- a/internal/restic/tree_test.go +++ b/internal/restic/tree_test.go @@ -119,6 +119,37 @@ func TestEmptyLoadTree(t *testing.T) { tree, tree2) } +func TestTreeEqualSerialization(t *testing.T) { + files := []string{"node.go", "tree.go", "tree_test.go"} + for i := 1; i <= len(files); i++ { + tree := restic.NewTree(i) + builder := restic.NewTreeJSONBuilder() + + for _, fn := range files[:i] { + fi, err := os.Lstat(fn) + rtest.OK(t, err) + node, err := restic.NodeFromFileInfo(fn, fi) + rtest.OK(t, err) + + rtest.OK(t, tree.Insert(node)) + rtest.OK(t, builder.AddNode(node)) + + rtest.Assert(t, tree.Insert(node) != nil, "no error on duplicate node") + rtest.Assert(t, builder.AddNode(node) != nil, "no error on duplicate node") + } + + treeBytes, err := json.Marshal(tree) + treeBytes = append(treeBytes, '\n') + rtest.OK(t, err) + + stiBytes, err := builder.Finalize() + rtest.OK(t, err) + + // compare serialization of an individual node and the SaveTreeIterator + rtest.Equals(t, treeBytes, stiBytes) + } +} + func BenchmarkBuildTree(b *testing.B) { const size = 100 // Directories of this size are not uncommon. From 4a10ebed15a6e7ffe4b99eb0f0337c777a4405b1 Mon Sep 17 00:00:00 2001 From: Michael Eischer Date: Sun, 22 May 2022 15:14:25 +0200 Subject: [PATCH 6/7] archiver: reduce memory usage for large files FutureBlob now uses a Take() method as a more memory-efficient way to retrieve the futures result. In addition, futures are now collected while saving the file. As only a limited number of blobs can be queued for uploading, for a large file nearly all FutureBlobs already have their result ready, such that the FutureBlob object just consumes memory. --- internal/archiver/archiver.go | 12 ++--- internal/archiver/blob_saver.go | 74 ++++++++++++---------------- internal/archiver/blob_saver_test.go | 4 +- internal/archiver/file_saver.go | 33 +++++++++---- internal/archiver/file_saver_test.go | 2 +- 5 files changed, 64 insertions(+), 61 deletions(-) diff --git a/internal/archiver/archiver.go b/internal/archiver/archiver.go index 3dae576d9..4fcc8e30c 100644 --- a/internal/archiver/archiver.go +++ b/internal/archiver/archiver.go @@ -184,17 +184,17 @@ func (arch *Archiver) saveTree(ctx context.Context, t *restic.TreeJSONBuilder) ( b := &Buffer{Data: buf} res := arch.blobSaver.Save(ctx, restic.TreeBlob, b) - res.Wait(ctx) - if !res.Known() { + sbr := res.Take(ctx) + if !sbr.known { s.TreeBlobs++ - s.TreeSize += uint64(res.Length()) - s.TreeSizeInRepo += uint64(res.SizeInRepo()) + s.TreeSize += uint64(sbr.length) + s.TreeSizeInRepo += uint64(sbr.sizeInRepo) } - // The context was canceled in the meantime, res.ID() might be invalid + // The context was canceled in the meantime, id might be invalid if ctx.Err() != nil { return restic.ID{}, s, ctx.Err() } - return res.ID(), s, nil + return sbr.id, s, nil } // nodeFromFileInfo returns the restic node from an os.FileInfo. diff --git a/internal/archiver/blob_saver.go b/internal/archiver/blob_saver.go index 0bd7d1fd9..b2b5e59bb 100644 --- a/internal/archiver/blob_saver.go +++ b/internal/archiver/blob_saver.go @@ -44,9 +44,7 @@ func (s *BlobSaver) TriggerShutdown() { // Save stores a blob in the repo. It checks the index and the known blobs // before saving anything. It takes ownership of the buffer passed in. func (s *BlobSaver) Save(ctx context.Context, t restic.BlobType, buf *Buffer) FutureBlob { - // buf might be freed once the job was submitted, thus calculate the length now - length := len(buf.Data) - ch := make(chan saveBlobResponse, 1) + ch := make(chan SaveBlobResponse, 1) select { case s.ch <- saveBlobJob{BlobType: t, buf: buf, ch: ch}: case <-ctx.Done(): @@ -55,72 +53,62 @@ func (s *BlobSaver) Save(ctx context.Context, t restic.BlobType, buf *Buffer) Fu return FutureBlob{ch: ch} } - return FutureBlob{ch: ch, length: length} + return FutureBlob{ch: ch} } // FutureBlob is returned by SaveBlob and will return the data once it has been processed. type FutureBlob struct { - ch <-chan saveBlobResponse - length int - res saveBlobResponse + ch <-chan SaveBlobResponse } -// Wait blocks until the result is available or the context is cancelled. -func (s *FutureBlob) Wait(ctx context.Context) { +func (s *FutureBlob) Poll() *SaveBlobResponse { select { - case <-ctx.Done(): - return case res, ok := <-s.ch: if ok { - s.res = res + return &res } + default: } + return nil } -// ID returns the ID of the blob after it has been saved. -func (s *FutureBlob) ID() restic.ID { - return s.res.id -} - -// Known returns whether or not the blob was already known. -func (s *FutureBlob) Known() bool { - return s.res.known -} - -// Length returns the raw length of the blob. -func (s *FutureBlob) Length() int { - return s.length -} - -// SizeInRepo returns the number of bytes added to the repo (including -// compression and crypto overhead). -func (s *FutureBlob) SizeInRepo() int { - return s.res.size +// Take blocks until the result is available or the context is cancelled. +func (s *FutureBlob) Take(ctx context.Context) SaveBlobResponse { + select { + case res, ok := <-s.ch: + if ok { + return res + } + case <-ctx.Done(): + } + return SaveBlobResponse{} } type saveBlobJob struct { restic.BlobType buf *Buffer - ch chan<- saveBlobResponse + ch chan<- SaveBlobResponse } -type saveBlobResponse struct { - id restic.ID - known bool - size int +type SaveBlobResponse struct { + id restic.ID + length int + sizeInRepo int + known bool } -func (s *BlobSaver) saveBlob(ctx context.Context, t restic.BlobType, buf []byte) (saveBlobResponse, error) { - id, known, size, err := s.repo.SaveBlob(ctx, t, buf, restic.ID{}, false) +func (s *BlobSaver) saveBlob(ctx context.Context, t restic.BlobType, buf []byte) (SaveBlobResponse, error) { + id, known, sizeInRepo, err := s.repo.SaveBlob(ctx, t, buf, restic.ID{}, false) if err != nil { - return saveBlobResponse{}, err + return SaveBlobResponse{}, err } - return saveBlobResponse{ - id: id, - known: known, - size: size, + return SaveBlobResponse{ + id: id, + length: len(buf), + sizeInRepo: sizeInRepo, + known: known, }, nil } diff --git a/internal/archiver/blob_saver_test.go b/internal/archiver/blob_saver_test.go index 69cd4c2e2..481139a3f 100644 --- a/internal/archiver/blob_saver_test.go +++ b/internal/archiver/blob_saver_test.go @@ -54,8 +54,8 @@ func TestBlobSaver(t *testing.T) { } for i, blob := range results { - blob.Wait(ctx) - if blob.Known() { + sbr := blob.Take(ctx) + if sbr.known { t.Errorf("blob %v is known, that should not be the case", i) } } diff --git a/internal/archiver/file_saver.go b/internal/archiver/file_saver.go index 1e6eea979..52dd59113 100644 --- a/internal/archiver/file_saver.go +++ b/internal/archiver/file_saver.go @@ -129,6 +129,15 @@ func (s *FileSaver) saveFile(ctx context.Context, chnker *chunker.Chunker, snPat chnker.Reset(f, s.pol) var results []FutureBlob + complete := func(sbr SaveBlobResponse) { + if !sbr.known { + stats.DataBlobs++ + stats.DataSize += uint64(sbr.length) + stats.DataSizeInRepo += uint64(sbr.sizeInRepo) + } + + node.Content = append(node.Content, sbr.id) + } node.Content = []restic.ID{} var size uint64 @@ -168,6 +177,17 @@ func (s *FileSaver) saveFile(ctx context.Context, chnker *chunker.Chunker, snPat } s.CompleteBlob(f.Name(), uint64(len(chunk.Data))) + + // collect already completed blobs + for len(results) > 0 { + sbr := results[0].Poll() + if sbr == nil { + break + } + results[0] = FutureBlob{} + results = results[1:] + complete(*sbr) + } } err = f.Close() @@ -176,15 +196,10 @@ func (s *FileSaver) saveFile(ctx context.Context, chnker *chunker.Chunker, snPat return fnr } - for _, res := range results { - res.Wait(ctx) - if !res.Known() { - stats.DataBlobs++ - stats.DataSize += uint64(res.Length()) - stats.DataSizeInRepo += uint64(res.SizeInRepo()) - } - - node.Content = append(node.Content, res.ID()) + for i, res := range results { + results[i] = FutureBlob{} + sbr := res.Take(ctx) + complete(sbr) } node.Size = size diff --git a/internal/archiver/file_saver_test.go b/internal/archiver/file_saver_test.go index 0bdb8ad50..e4d1dcdb8 100644 --- a/internal/archiver/file_saver_test.go +++ b/internal/archiver/file_saver_test.go @@ -34,7 +34,7 @@ func startFileSaver(ctx context.Context, t testing.TB) (*FileSaver, context.Cont wg, ctx := errgroup.WithContext(ctx) saveBlob := func(ctx context.Context, tpe restic.BlobType, buf *Buffer) FutureBlob { - ch := make(chan saveBlobResponse) + ch := make(chan SaveBlobResponse) close(ch) return FutureBlob{ch: ch} } From 2ba14160de002a120214b94a4e2e88ae4774ef60 Mon Sep 17 00:00:00 2001 From: Michael Eischer Date: Sat, 23 Jul 2022 14:49:08 +0200 Subject: [PATCH 7/7] Add changelog for the optimized tree serialization --- changelog/unreleased/pull-3773 | 7 +++++++ 1 file changed, 7 insertions(+) create mode 100644 changelog/unreleased/pull-3773 diff --git a/changelog/unreleased/pull-3773 b/changelog/unreleased/pull-3773 new file mode 100644 index 000000000..17ae6f1bc --- /dev/null +++ b/changelog/unreleased/pull-3773 @@ -0,0 +1,7 @@ +Enhancement: Optimize memory usage for directories with many files + +Backing up a directory with hundred thousands or more files causes restic to +require large amounts of memory. We have optimized `backup` command such that +it requires up to 30% less memory. + +https://github.com/restic/restic/pull/3773