From 2b88cd6eab5cfc1dd0b43448e041a2365736b36c Mon Sep 17 00:00:00 2001 From: Michael Eischer Date: Fri, 19 Aug 2022 23:08:13 +0200 Subject: [PATCH] archiver: Restructure SaveTree to work like SaveDir SaveTree did not use the TreeSaver but rather managed the tree collection and upload itself. This prevents using the parallelism offered by the TreeSaver and duplicates all related code. Using the TreeSaver can provide some speed-ups as all steps within the backup tree now rely on FutureNodes. This can be especially relevant for backups with large amounts of explicitly specified files. The main difference between SaveTree and SaveDir is, that only the former can save tree blobs in which nodes have a different name than the actual file on disk. This is the result of resolving name conflicts between multiple files with the same name. The filename that must be used within the snapshot is now passed directly to restic.NodeFromFileInfo. This ensures that a FutureNode already contains the correct filename. --- internal/archiver/archiver.go | 180 +++++++++------------------ internal/archiver/archiver_test.go | 10 +- internal/archiver/file_saver.go | 4 +- internal/archiver/file_saver_test.go | 5 +- internal/archiver/tree_saver.go | 25 +++- internal/archiver/tree_saver_test.go | 49 +++++--- internal/restic/tree.go | 11 -- 7 files changed, 119 insertions(+), 165 deletions(-) diff --git a/internal/archiver/archiver.go b/internal/archiver/archiver.go index 776aedb53..da3548ae7 100644 --- a/internal/archiver/archiver.go +++ b/internal/archiver/archiver.go @@ -172,37 +172,14 @@ func (arch *Archiver) error(item string, err error) error { return errf } -// saveTree stores a tree in the repo. It checks the index and the known blobs -// before saving anything. -func (arch *Archiver) saveTree(ctx context.Context, t *restic.TreeJSONBuilder) (restic.ID, ItemStats, error) { - var s ItemStats - buf, err := t.Finalize() - if err != nil { - return restic.ID{}, s, err - } - - b := &Buffer{Data: buf} - res := arch.blobSaver.Save(ctx, restic.TreeBlob, b) - - sbr := res.Take(ctx) - if !sbr.known { - s.TreeBlobs++ - s.TreeSize += uint64(sbr.length) - s.TreeSizeInRepo += uint64(sbr.sizeInRepo) - } - // The context was canceled in the meantime, id might be invalid - if ctx.Err() != nil { - return restic.ID{}, s, ctx.Err() - } - return sbr.id, s, nil -} - // nodeFromFileInfo returns the restic node from an os.FileInfo. -func (arch *Archiver) nodeFromFileInfo(filename string, fi os.FileInfo) (*restic.Node, error) { +func (arch *Archiver) nodeFromFileInfo(snPath, filename string, fi os.FileInfo) (*restic.Node, error) { node, err := restic.NodeFromFileInfo(filename, fi) if !arch.WithAtime { node.AccessTime = node.ModTime } + // overwrite name to match that within the snapshot + node.Name = path.Base(snPath) return node, errors.Wrap(err, "NodeFromFileInfo") } @@ -237,7 +214,7 @@ func (arch *Archiver) wrapLoadTreeError(id restic.ID, err error) error { func (arch *Archiver) SaveDir(ctx context.Context, snPath string, dir string, fi os.FileInfo, previous *restic.Tree, complete CompleteFunc) (d FutureNode, err error) { debug.Log("%v %v", snPath, dir) - treeNode, err := arch.nodeFromFileInfo(dir, fi) + treeNode, err := arch.nodeFromFileInfo(snPath, dir, fi) if err != nil { return FutureNode{}, err } @@ -393,7 +370,7 @@ func (arch *Archiver) Save(ctx context.Context, snPath, target string, previous debug.Log("%v hasn't changed, using old list of blobs", target) arch.CompleteItem(snPath, previous, previous, ItemStats{}, time.Since(start)) arch.CompleteBlob(snPath, previous.Size) - node, err := arch.nodeFromFileInfo(target, fi) + node, err := arch.nodeFromFileInfo(snPath, target, fi) if err != nil { return FutureNode{}, false, err } @@ -488,7 +465,7 @@ func (arch *Archiver) Save(ctx context.Context, snPath, target string, previous default: debug.Log(" %v other", target) - node, err := arch.nodeFromFileInfo(target, fi) + node, err := arch.nodeFromFileInfo(snPath, target, fi) if err != nil { return FutureNode{}, false, err } @@ -557,13 +534,32 @@ func (arch *Archiver) statDir(dir string) (os.FileInfo, error) { // SaveTree stores a Tree in the repo, returned is the tree. snPath is the path // within the current snapshot. -func (arch *Archiver) SaveTree(ctx context.Context, snPath string, atree *Tree, previous *restic.Tree) (*restic.Tree, error) { +func (arch *Archiver) SaveTree(ctx context.Context, snPath string, atree *Tree, previous *restic.Tree, complete CompleteFunc) (FutureNode, int, error) { + + var node *restic.Node + if snPath != "/" { + if atree.FileInfoPath == "" { + return FutureNode{}, 0, errors.Errorf("FileInfoPath for %v is empty", snPath) + } + + fi, err := arch.statDir(atree.FileInfoPath) + if err != nil { + return FutureNode{}, 0, err + } + + debug.Log("%v, dir node data loaded from %v", snPath, atree.FileInfoPath) + node, err = arch.nodeFromFileInfo(snPath, atree.FileInfoPath, fi) + if err != nil { + return FutureNode{}, 0, err + } + } else { + // fake root node + node = &restic.Node{} + } + debug.Log("%v (%v nodes), parent %v", snPath, len(atree.Nodes), previous) - nodeNames := atree.NodeNames() - tree := restic.NewTree(len(nodeNames)) - - futureNodes := make(map[string]FutureNode) + nodes := make([]FutureNode, 0, len(nodeNames)) // iterate over the nodes of atree in lexicographic (=deterministic) order for _, name := range nodeNames { @@ -571,7 +567,7 @@ func (arch *Archiver) SaveTree(ctx context.Context, snPath string, atree *Tree, // test if context has been cancelled if ctx.Err() != nil { - return nil, ctx.Err() + return FutureNode{}, 0, ctx.Err() } // this is a leaf node @@ -584,15 +580,15 @@ func (arch *Archiver) SaveTree(ctx context.Context, snPath string, atree *Tree, // ignore error continue } - return nil, err + return FutureNode{}, 0, err } if err != nil { - return nil, err + return FutureNode{}, 0, err } if !excluded { - futureNodes[name] = fn + nodes = append(nodes, fn) } continue } @@ -606,85 +602,21 @@ func (arch *Archiver) SaveTree(ctx context.Context, snPath string, atree *Tree, err = arch.error(join(snPath, name), err) } if err != nil { - return nil, err + return FutureNode{}, 0, err } // not a leaf node, archive subtree - subtree, err := arch.SaveTree(ctx, join(snPath, name), &subatree, oldSubtree) + fn, _, err := arch.SaveTree(ctx, join(snPath, name), &subatree, oldSubtree, func(n *restic.Node, is ItemStats) { + arch.CompleteItem(snItem, oldNode, n, is, time.Since(start)) + }) if err != nil { - return nil, err + return FutureNode{}, 0, err } - - tb, err := restic.TreeToBuilder(subtree) - if err != nil { - return nil, err - } - id, nodeStats, err := arch.saveTree(ctx, tb) - if err != nil { - return nil, err - } - - if subatree.FileInfoPath == "" { - return nil, errors.Errorf("FileInfoPath for %v/%v is empty", snPath, name) - } - - debug.Log("%v, saved subtree %v as %v", snPath, subtree, id.Str()) - - fi, err := arch.statDir(subatree.FileInfoPath) - if err != nil { - return nil, err - } - - debug.Log("%v, dir node data loaded from %v", snPath, subatree.FileInfoPath) - - node, err := arch.nodeFromFileInfo(subatree.FileInfoPath, fi) - if err != nil { - return nil, err - } - - node.Name = name - node.Subtree = &id - - err = tree.Insert(node) - if err != nil { - return nil, err - } - - arch.CompleteItem(snItem, oldNode, node, nodeStats, time.Since(start)) + nodes = append(nodes, fn) } - debug.Log("waiting on %d nodes", len(futureNodes)) - - // process all futures - for name, fn := range futureNodes { - fnr := fn.take(ctx) - - // return the error, or ignore it - if fnr.err != nil { - fnr.err = arch.error(fnr.target, fnr.err) - if fnr.err == nil { - // ignore error - continue - } - - return nil, fnr.err - } - - // when the error is ignored, the node could not be saved, so ignore it - if fnr.node == nil { - debug.Log("%v excluded: %v", fnr.snPath, fnr.target) - continue - } - - fnr.node.Name = name - - err := tree.Insert(fnr.node) - if err != nil { - return nil, err - } - } - - return tree, nil + fn := arch.treeSaver.Save(ctx, snPath, atree.FileInfoPath, node, nodes, complete) + return fn, len(nodes), nil } // flags are passed to fs.OpenFile. O_RDONLY is implied. @@ -786,7 +718,7 @@ func (arch *Archiver) runWorkers(ctx context.Context, wg *errgroup.Group) { arch.fileSaver.CompleteBlob = arch.CompleteBlob arch.fileSaver.NodeFromFileInfo = arch.nodeFromFileInfo - arch.treeSaver = NewTreeSaver(ctx, wg, arch.Options.SaveTreeConcurrency, arch.saveTree, arch.Error) + arch.treeSaver = NewTreeSaver(ctx, wg, arch.Options.SaveTreeConcurrency, arch.blobSaver.Save, arch.Error) } func (arch *Archiver) stopWorkers() { @@ -819,27 +751,33 @@ func (arch *Archiver) Snapshot(ctx context.Context, targets []string, opts Snaps wg, wgCtx := errgroup.WithContext(wgUpCtx) start := time.Now() - var stats ItemStats wg.Go(func() error { arch.runWorkers(wgCtx, wg) debug.Log("starting snapshot") - tree, err := arch.SaveTree(wgCtx, "/", atree, arch.loadParentTree(wgCtx, opts.ParentSnapshot)) + fn, nodeCount, err := arch.SaveTree(wgCtx, "/", atree, arch.loadParentTree(wgCtx, opts.ParentSnapshot), func(n *restic.Node, is ItemStats) { + arch.CompleteItem("/", nil, nil, is, time.Since(start)) + }) if err != nil { return err } - if len(tree.Nodes) == 0 { + fnr := fn.take(wgCtx) + if fnr.err != nil { + return fnr.err + } + + if wgCtx.Err() != nil { + return wgCtx.Err() + } + + if nodeCount == 0 { return errors.New("snapshot is empty") } - tb, err := restic.TreeToBuilder(tree) - if err != nil { - return err - } - rootTreeID, stats, err = arch.saveTree(wgCtx, tb) + rootTreeID = *fnr.node.Subtree arch.stopWorkers() - return err + return nil }) err = wg.Wait() @@ -850,8 +788,6 @@ func (arch *Archiver) Snapshot(ctx context.Context, targets []string, opts Snaps return err } - arch.CompleteItem("/", nil, nil, stats, time.Since(start)) - return arch.Repo.Flush(ctx) }) err = wgUp.Wait() diff --git a/internal/archiver/archiver_test.go b/internal/archiver/archiver_test.go index b5650d1b6..9f03514b8 100644 --- a/internal/archiver/archiver_test.go +++ b/internal/archiver/archiver_test.go @@ -1118,16 +1118,18 @@ func TestArchiverSaveTree(t *testing.T) { t.Fatal(err) } - tree, err := arch.SaveTree(ctx, "/", atree, nil) + fn, _, err := arch.SaveTree(ctx, "/", atree, nil, nil) if err != nil { t.Fatal(err) } - treeID, err := restic.SaveTree(ctx, repo, tree) - if err != nil { - t.Fatal(err) + fnr := fn.take(context.TODO()) + if fnr.err != nil { + t.Fatal(fnr.err) } + treeID := *fnr.node.Subtree + arch.stopWorkers() err = repo.Flush(ctx) if err != nil { diff --git a/internal/archiver/file_saver.go b/internal/archiver/file_saver.go index 52dd59113..460ba7457 100644 --- a/internal/archiver/file_saver.go +++ b/internal/archiver/file_saver.go @@ -27,7 +27,7 @@ type FileSaver struct { CompleteBlob func(filename string, bytes uint64) - NodeFromFileInfo func(filename string, fi os.FileInfo) (*restic.Node, error) + NodeFromFileInfo func(snPath, filename string, fi os.FileInfo) (*restic.Node, error) } // NewFileSaver returns a new file saver. A worker pool with fileWorkers is @@ -112,7 +112,7 @@ func (s *FileSaver) saveFile(ctx context.Context, chnker *chunker.Chunker, snPat debug.Log("%v", snPath) - node, err := s.NodeFromFileInfo(f.Name(), fi) + node, err := s.NodeFromFileInfo(snPath, f.Name(), fi) if err != nil { _ = f.Close() fnr.err = err diff --git a/internal/archiver/file_saver_test.go b/internal/archiver/file_saver_test.go index e4d1dcdb8..a311216c7 100644 --- a/internal/archiver/file_saver_test.go +++ b/internal/archiver/file_saver_test.go @@ -4,6 +4,7 @@ import ( "context" "fmt" "io/ioutil" + "os" "path/filepath" "runtime" "testing" @@ -46,7 +47,9 @@ func startFileSaver(ctx context.Context, t testing.TB) (*FileSaver, context.Cont } s := NewFileSaver(ctx, wg, saveBlob, pol, workers, workers) - s.NodeFromFileInfo = restic.NodeFromFileInfo + s.NodeFromFileInfo = func(snPath, filename string, fi os.FileInfo) (*restic.Node, error) { + return restic.NodeFromFileInfo(filename, fi) + } return s, ctx, wg } diff --git a/internal/archiver/tree_saver.go b/internal/archiver/tree_saver.go index 5aab09b94..06f43cd46 100644 --- a/internal/archiver/tree_saver.go +++ b/internal/archiver/tree_saver.go @@ -10,7 +10,7 @@ import ( // TreeSaver concurrently saves incoming trees to the repo. type TreeSaver struct { - saveTree func(context.Context, *restic.TreeJSONBuilder) (restic.ID, ItemStats, error) + saveBlob func(ctx context.Context, t restic.BlobType, buf *Buffer) FutureBlob errFn ErrorFunc ch chan<- saveTreeJob @@ -18,12 +18,12 @@ type TreeSaver struct { // NewTreeSaver returns a new tree saver. A worker pool with treeWorkers is // started, it is stopped when ctx is cancelled. -func NewTreeSaver(ctx context.Context, wg *errgroup.Group, treeWorkers uint, saveTree func(context.Context, *restic.TreeJSONBuilder) (restic.ID, ItemStats, error), errFn ErrorFunc) *TreeSaver { +func NewTreeSaver(ctx context.Context, wg *errgroup.Group, treeWorkers uint, saveBlob func(ctx context.Context, t restic.BlobType, buf *Buffer) FutureBlob, errFn ErrorFunc) *TreeSaver { ch := make(chan saveTreeJob) s := &TreeSaver{ ch: ch, - saveTree: saveTree, + saveBlob: saveBlob, errFn: errFn, } @@ -110,13 +110,26 @@ func (s *TreeSaver) save(ctx context.Context, job *saveTreeJob) (*restic.Node, I } } - id, treeStats, err := s.saveTree(ctx, builder) - stats.Add(treeStats) + buf, err := builder.Finalize() if err != nil { return nil, stats, err } - node.Subtree = &id + b := &Buffer{Data: buf} + res := s.saveBlob(ctx, restic.TreeBlob, b) + + sbr := res.Take(ctx) + if !sbr.known { + stats.TreeBlobs++ + stats.TreeSize += uint64(sbr.length) + stats.TreeSizeInRepo += uint64(sbr.sizeInRepo) + } + // The context was canceled in the meantime, id might be invalid + if ctx.Err() != nil { + return nil, stats, ctx.Err() + } + + node.Subtree = &sbr.id return node, stats, nil } diff --git a/internal/archiver/tree_saver_test.go b/internal/archiver/tree_saver_test.go index 36e585ae1..67ef21111 100644 --- a/internal/archiver/tree_saver_test.go +++ b/internal/archiver/tree_saver_test.go @@ -4,7 +4,6 @@ import ( "context" "fmt" "runtime" - "sync/atomic" "testing" "github.com/restic/restic/internal/errors" @@ -12,18 +11,28 @@ import ( "golang.org/x/sync/errgroup" ) +func newFutureBlobWithResponse() FutureBlob { + ch := make(chan SaveBlobResponse, 1) + ch <- SaveBlobResponse{ + id: restic.NewRandomID(), + known: false, + length: 123, + sizeInRepo: 123, + } + return FutureBlob{ch: ch} +} + func TestTreeSaver(t *testing.T) { ctx, cancel := context.WithCancel(context.Background()) defer cancel() wg, ctx := errgroup.WithContext(ctx) - saveFn := func(context.Context, *restic.TreeJSONBuilder) (restic.ID, ItemStats, error) { - return restic.NewRandomID(), ItemStats{TreeBlobs: 1, TreeSize: 123}, nil + saveFn := func(ctx context.Context, t restic.BlobType, buf *Buffer) FutureBlob { + return newFutureBlobWithResponse() } - errFn := func(snPath string, err error) error { - return nil + return err } b := NewTreeSaver(ctx, wg, uint(runtime.NumCPU()), saveFn, errFn) @@ -35,7 +44,7 @@ func TestTreeSaver(t *testing.T) { Name: fmt.Sprintf("file-%d", i), } - fb := b.Save(ctx, "/", node.Name, node, nil, nil) + fb := b.Save(ctx, join("/", node.Name), node.Name, node, nil, nil) results = append(results, fb) } @@ -54,7 +63,7 @@ func TestTreeSaver(t *testing.T) { func TestTreeSaverError(t *testing.T) { var tests = []struct { trees int - failAt int32 + failAt int }{ {1, 1}, {20, 2}, @@ -72,19 +81,11 @@ func TestTreeSaverError(t *testing.T) { wg, ctx := errgroup.WithContext(ctx) - var num int32 - saveFn := func(context.Context, *restic.TreeJSONBuilder) (restic.ID, ItemStats, error) { - val := atomic.AddInt32(&num, 1) - if val == test.failAt { - t.Logf("sending error for request %v\n", test.failAt) - return restic.ID{}, ItemStats{}, errTest - } - return restic.NewRandomID(), ItemStats{TreeBlobs: 1, TreeSize: 123}, nil + saveFn := func(ctx context.Context, tpe restic.BlobType, buf *Buffer) FutureBlob { + return newFutureBlobWithResponse() } - errFn := func(snPath string, err error) error { - t.Logf("ignoring error %v\n", err) - return nil + return err } b := NewTreeSaver(ctx, wg, uint(runtime.NumCPU()), saveFn, errFn) @@ -95,8 +96,18 @@ func TestTreeSaverError(t *testing.T) { node := &restic.Node{ Name: fmt.Sprintf("file-%d", i), } + nodes := []FutureNode{ + newFutureNodeWithResult(futureNodeResult{node: &restic.Node{ + Name: fmt.Sprintf("child-%d", i), + }}), + } + if (i + 1) == test.failAt { + nodes = append(nodes, newFutureNodeWithResult(futureNodeResult{ + err: errTest, + })) + } - fb := b.Save(ctx, "/", node.Name, node, nil, nil) + fb := b.Save(ctx, join("/", node.Name), node.Name, node, nodes, nil) results = append(results, fb) } diff --git a/internal/restic/tree.go b/internal/restic/tree.go index d1264074c..3851911df 100644 --- a/internal/restic/tree.go +++ b/internal/restic/tree.go @@ -182,14 +182,3 @@ func (builder *TreeJSONBuilder) Finalize() ([]byte, error) { builder.buf = bytes.Buffer{} return buf, nil } - -func TreeToBuilder(t *Tree) (*TreeJSONBuilder, error) { - builder := NewTreeJSONBuilder() - for _, node := range t.Nodes { - err := builder.AddNode(node) - if err != nil { - return nil, err - } - } - return builder, nil -}