diff --git a/internal/archiver/archiver.go b/internal/archiver/archiver.go index 5b3fd611a..677976153 100644 --- a/internal/archiver/archiver.go +++ b/internal/archiver/archiver.go @@ -50,6 +50,7 @@ type Archiver struct { blobSaver *BlobSaver fileSaver *FileSaver + treeSaver *TreeSaver // Error is called for all errors that occur during backup. Error ErrorFunc @@ -86,6 +87,10 @@ type Options struct { // concurrently. If it's set to zero, the default is the number of CPUs // available in the system. SaveBlobConcurrency uint + + // SaveTreeConcurrency sets how many trees are marshalled and saved to the + // repo concurrently. + SaveTreeConcurrency uint } // ApplyDefaults returns a copy of o with the default options set for all unset @@ -102,6 +107,12 @@ func (o Options) ApplyDefaults() Options { o.SaveBlobConcurrency = uint(runtime.NumCPU()) } + if o.SaveTreeConcurrency == 0 { + // use a relatively high concurrency here, having multiple SaveTree + // workers is cheap + o.SaveTreeConcurrency = o.SaveBlobConcurrency * 20 + } + return o } @@ -212,24 +223,20 @@ func (arch *Archiver) loadSubtree(ctx context.Context, node *restic.Node) *resti // SaveDir stores a directory in the repo and returns the node. snPath is the // path within the current snapshot. -func (arch *Archiver) SaveDir(ctx context.Context, snPath string, fi os.FileInfo, dir string, previous *restic.Tree) (*restic.Node, ItemStats, error) { +func (arch *Archiver) SaveDir(ctx context.Context, snPath string, fi os.FileInfo, dir string, previous *restic.Tree) (d FutureTree, err error) { debug.Log("%v %v", snPath, dir) - var s ItemStats - treeNode, err := arch.nodeFromFileInfo(dir, fi) if err != nil { - return nil, s, err + return FutureTree{}, err } names, err := readdirnames(arch.FS, dir) if err != nil { - return nil, s, err + return FutureTree{}, err } - var futures []FutureNode - - tree := restic.NewTree() + nodes := make([]FutureNode, 0, len(names)) for _, name := range names { pathname := arch.FS.Join(dir, name) @@ -245,54 +252,22 @@ func (arch *Archiver) SaveDir(ctx context.Context, snPath string, fi os.FileInfo continue } - return nil, s, err + return FutureTree{}, err } if excluded { continue } - futures = append(futures, fn) + nodes = append(nodes, fn) } - for _, fn := range futures { - fn.wait() + ft := arch.treeSaver.Save(ctx, snPath, treeNode, nodes) - // return the error if it wasn't ignored - if fn.err != nil { - fn.err = arch.error(fn.target, fn.fi, fn.err) - if fn.err == nil { - // ignore error - continue - } - - return nil, s, fn.err - } - - // when the error is ignored, the node could not be saved, so ignore it - if fn.node == nil { - debug.Log("%v excluded: %v", fn.snPath, fn.target) - continue - } - - err := tree.Insert(fn.node) - if err != nil { - return nil, s, err - } - } - - id, treeStats, err := arch.saveTree(ctx, tree) - if err != nil { - return nil, ItemStats{}, err - } - - s.Add(treeStats) - - treeNode.Subtree = &id - return treeNode, s, nil + return ft, nil } -// FutureNode holds a reference to a node or a FutureFile. +// FutureNode holds a reference to a node, FutureFile, or FutureTree. type FutureNode struct { snPath, target string @@ -306,14 +281,31 @@ type FutureNode struct { isFile bool file FutureFile + isDir bool + dir FutureTree } -func (fn *FutureNode) wait() { - if fn.isFile { +func (fn *FutureNode) wait(ctx context.Context) { + switch { + case fn.isFile: // wait for and collect the data for the file fn.node = fn.file.Node() fn.err = fn.file.Err() fn.stats = fn.file.Stats() + + // ensure the other stuff can be garbage-collected + fn.file = FutureFile{} + fn.isFile = false + + case fn.isDir: + // wait for and collect the data for the dir + fn.node = fn.dir.Node() + fn.err = fn.dir.Err() + fn.stats = fn.dir.Stats() + + // ensure the other stuff can be garbage-collected + fn.dir = FutureTree{} + fn.isDir = false } } @@ -324,6 +316,8 @@ func (fn *FutureNode) wait() { // // snPath is the path within the current snapshot. func (arch *Archiver) Save(ctx context.Context, snPath, target string, previous *restic.Node) (fn FutureNode, excluded bool, err error) { + start := time.Now() + fn = FutureNode{ snPath: snPath, target: target, @@ -400,7 +394,9 @@ func (arch *Archiver) Save(ctx context.Context, snPath, target string, previous snItem := snPath + "/" start := time.Now() oldSubtree := arch.loadSubtree(ctx, previous) - fn.node, fn.stats, err = arch.SaveDir(ctx, snPath, fi, target, oldSubtree) + + fn.isDir = true + fn.dir, err = arch.SaveDir(ctx, snPath, fi, target, oldSubtree) if err == nil { arch.CompleteItem(snItem, previous, fn.node, fn.stats, time.Since(start)) } else { @@ -429,6 +425,8 @@ func (arch *Archiver) Save(ctx context.Context, snPath, target string, previous } } + debug.Log("return after %.3f", time.Since(start).Seconds()) + return fn, false, nil } @@ -564,9 +562,11 @@ func (arch *Archiver) SaveTree(ctx context.Context, snPath string, atree *Tree, arch.CompleteItem(snItem, oldNode, node, nodeStats, time.Since(start)) } + debug.Log("waiting on %d nodes", len(futureNodes)) + // process all futures for name, fn := range futureNodes { - fn.wait() + fn.wait(ctx) // return the error, or ignore it if fn.err != nil { @@ -720,14 +720,16 @@ func (arch *Archiver) loadParentTree(ctx context.Context, snapshotID restic.ID) // runWorkers starts the worker pools, which are stopped when the context is cancelled. func (arch *Archiver) runWorkers(ctx context.Context) { arch.blobSaver = NewBlobSaver(ctx, arch.Repo, arch.Options.SaveBlobConcurrency) + arch.fileSaver = NewFileSaver(ctx, arch.FS, arch.blobSaver, arch.Repo.Config().ChunkerPolynomial, arch.Options.FileReadConcurrency, arch.Options.SaveBlobConcurrency) arch.fileSaver.CompleteBlob = arch.CompleteBlob - arch.fileSaver.NodeFromFileInfo = arch.nodeFromFileInfo + + arch.treeSaver = NewTreeSaver(ctx, arch.Options.SaveTreeConcurrency, arch.saveTree, arch.error) } // Snapshot saves several targets and returns a snapshot. diff --git a/internal/archiver/archiver_test.go b/internal/archiver/archiver_test.go index a8557ef2a..8916f58a3 100644 --- a/internal/archiver/archiver_test.go +++ b/internal/archiver/archiver_test.go @@ -608,7 +608,12 @@ func TestArchiverSaveDir(t *testing.T) { t.Fatal(err) } - node, stats, err := arch.SaveDir(ctx, "/", fi, test.target, nil) + ft, err := arch.SaveDir(ctx, "/", fi, test.target, nil) + if err != nil { + t.Fatal(err) + } + + node, stats, err := ft.Node(), ft.Stats(), ft.Err() if err != nil { t.Fatal(err) } @@ -681,7 +686,12 @@ func TestArchiverSaveDirIncremental(t *testing.T) { t.Fatal(err) } - node, stats, err := arch.SaveDir(ctx, "/", fi, tempdir, nil) + ft, err := arch.SaveDir(ctx, "/", fi, tempdir, nil) + if err != nil { + t.Fatal(err) + } + + node, stats, err := ft.Node(), ft.Stats(), ft.Err() if err != nil { t.Fatal(err) } diff --git a/internal/archiver/blob_saver.go b/internal/archiver/blob_saver.go index 1863d440e..4d0f39c48 100644 --- a/internal/archiver/blob_saver.go +++ b/internal/archiver/blob_saver.go @@ -27,7 +27,7 @@ type BlobSaver struct { // NewBlobSaver returns a new blob. A worker pool is started, it is stopped // when ctx is cancelled. func NewBlobSaver(ctx context.Context, repo Saver, workers uint) *BlobSaver { - ch := make(chan saveBlobJob, 2*int(workers)) + ch := make(chan saveBlobJob) s := &BlobSaver{ repo: repo, knownBlobs: restic.NewBlobSet(), diff --git a/internal/archiver/tree_saver.go b/internal/archiver/tree_saver.go new file mode 100644 index 000000000..6428c6289 --- /dev/null +++ b/internal/archiver/tree_saver.go @@ -0,0 +1,158 @@ +package archiver + +import ( + "context" + "sync" + + "github.com/restic/restic/internal/debug" + "github.com/restic/restic/internal/restic" +) + +// FutureTree is returned by Save and will return the data once it +// has been processed. +type FutureTree struct { + ch <-chan saveTreeResponse + res saveTreeResponse +} + +func (s *FutureTree) wait() { + res, ok := <-s.ch + if ok { + s.res = res + } +} + +// Node returns the node once it is available. +func (s *FutureTree) Node() *restic.Node { + s.wait() + return s.res.node +} + +// Stats returns the stats for the file once they are available. +func (s *FutureTree) Stats() ItemStats { + s.wait() + return s.res.stats +} + +// Err returns the error in case an error occurred. +func (s *FutureTree) Err() error { + s.wait() + return s.res.err +} + +// TreeSaver concurrently saves incoming trees to the repo. +type TreeSaver struct { + saveTree func(context.Context, *restic.Tree) (restic.ID, ItemStats, error) + errFn ErrorFunc + + ch chan<- saveTreeJob + wg sync.WaitGroup +} + +// NewTreeSaver returns a new tree saver. A worker pool with treeWorkers is +// started, it is stopped when ctx is cancelled. +func NewTreeSaver(ctx context.Context, treeWorkers uint, saveTree func(context.Context, *restic.Tree) (restic.ID, ItemStats, error), errFn ErrorFunc) *TreeSaver { + ch := make(chan saveTreeJob) + + s := &TreeSaver{ + ch: ch, + saveTree: saveTree, + errFn: errFn, + } + + for i := uint(0); i < treeWorkers; i++ { + s.wg.Add(1) + go s.worker(ctx, &s.wg, ch) + } + + return s +} + +// Save stores the dir d and returns the data once it has been completed. +func (s *TreeSaver) Save(ctx context.Context, snPath string, node *restic.Node, nodes []FutureNode) FutureTree { + ch := make(chan saveTreeResponse, 1) + s.ch <- saveTreeJob{ + snPath: snPath, + node: node, + nodes: nodes, + ch: ch, + } + + return FutureTree{ch: ch} +} + +type saveTreeJob struct { + snPath string + nodes []FutureNode + node *restic.Node + ch chan<- saveTreeResponse +} + +type saveTreeResponse struct { + node *restic.Node + stats ItemStats + err error +} + +// save stores the nodes as a tree in the repo. +func (s *TreeSaver) save(ctx context.Context, snPath string, node *restic.Node, nodes []FutureNode) (*restic.Node, ItemStats, error) { + var stats ItemStats + + tree := restic.NewTree() + for _, fn := range nodes { + fn.wait(ctx) + + // return the error if it wasn't ignored + if fn.err != nil { + debug.Log("err for %v: %v", fn.node.Name, fn.err) + fn.err = s.errFn(fn.target, fn.fi, fn.err) + if fn.err == nil { + // ignore error + continue + } + + return nil, stats, fn.err + } + + // when the error is ignored, the node could not be saved, so ignore it + if fn.node == nil { + debug.Log("%v excluded: %v", fn.snPath, fn.target) + continue + } + + debug.Log("insert %v", fn.node.Name) + err := tree.Insert(fn.node) + if err != nil { + return nil, stats, err + } + } + + id, treeStats, err := s.saveTree(ctx, tree) + stats.Add(treeStats) + if err != nil { + return nil, stats, err + } + + node.Subtree = &id + return node, stats, nil +} + +func (s *TreeSaver) worker(ctx context.Context, wg *sync.WaitGroup, jobs <-chan saveTreeJob) { + defer wg.Done() + for { + var job saveTreeJob + select { + case <-ctx.Done(): + return + case job = <-jobs: + } + + node, stats, err := s.save(ctx, job.snPath, job.node, job.nodes) + job.ch <- saveTreeResponse{ + node: node, + stats: stats, + err: err, + } + close(job.ch) + } +}