From 187c8fb259a8324e066673bfe426b2cabd11a045 Mon Sep 17 00:00:00 2001 From: Alexander Weiss Date: Thu, 12 Nov 2020 02:49:53 +0100 Subject: [PATCH] Parallelize MasterIndex.Save() --- internal/repository/master_index.go | 112 ++++++++++++++++++---------- 1 file changed, 72 insertions(+), 40 deletions(-) diff --git a/internal/repository/master_index.go b/internal/repository/master_index.go index e7f01de7e..041bbc669 100644 --- a/internal/repository/master_index.go +++ b/internal/repository/master_index.go @@ -7,6 +7,7 @@ import ( "github.com/restic/restic/internal/debug" "github.com/restic/restic/internal/restic" "github.com/restic/restic/internal/ui/progress" + "golang.org/x/sync/errgroup" ) // MasterIndex is a collection of indexes and IDs of chunks that are in the process of being saved. @@ -261,10 +262,12 @@ func (mi *MasterIndex) MergeFinalIndexes() { mi.idx = newIdx } +const saveIndexParallelism = 4 + // Save saves all known indexes to index files, leaving out any -// packs whose ID is contained in packBlacklist. The new index contains the IDs -// of all known indexes in the "supersedes" field. The IDs are also returned in -// the IDSet obsolete +// packs whose ID is contained in packBlacklist from finalized indexes. +// The new index contains the IDs of all known indexes in the "supersedes" +// field. The IDs are also returned in the IDSet obsolete. // After calling this function, you should remove the obsolete index files. func (mi *MasterIndex) Save(ctx context.Context, repo restic.Repository, packBlacklist restic.IDSet, extraObsolete restic.IDs, p *progress.Counter) (obsolete restic.IDSet, err error) { mi.idxMutex.Lock() @@ -275,50 +278,79 @@ func (mi *MasterIndex) Save(ctx context.Context, repo restic.Repository, packBla newIndex := NewIndex() obsolete = restic.NewIDSet() - for i, idx := range mi.idx { - if idx.Final() { - ids, err := idx.IDs() - if err != nil { - debug.Log("index %d does not have an ID: %v", err) - return nil, err - } + // track spawned goroutines using wg, create a new context which is + // cancelled as soon as an error occurs. + wg, ctx := errgroup.WithContext(ctx) - debug.Log("adding index ids %v to supersedes field", ids) + ch := make(chan *Index) - err = newIndex.AddToSupersedes(ids...) - if err != nil { - return nil, err - } - obsolete.Merge(restic.NewIDSet(ids...)) - } else { - debug.Log("index %d isn't final, don't add to supersedes field", i) - } - - debug.Log("adding index %d", i) - - for pbs := range idx.EachByPack(ctx, packBlacklist) { - newIndex.StorePack(pbs.packID, pbs.blobs) - p.Add(1) - if IndexFull(newIndex) { - newIndex.Finalize() - if _, err := SaveIndex(ctx, repo, newIndex); err != nil { - return nil, err + wg.Go(func() error { + defer close(ch) + for i, idx := range mi.idx { + if idx.Final() { + ids, err := idx.IDs() + if err != nil { + debug.Log("index %d does not have an ID: %v", err) + return err + } + + debug.Log("adding index ids %v to supersedes field", ids) + + err = newIndex.AddToSupersedes(ids...) + if err != nil { + return err + } + obsolete.Merge(restic.NewIDSet(ids...)) + } else { + debug.Log("index %d isn't final, don't add to supersedes field", i) + } + + debug.Log("adding index %d", i) + + for pbs := range idx.EachByPack(ctx, packBlacklist) { + newIndex.StorePack(pbs.packID, pbs.blobs) + p.Add(1) + if IndexFull(newIndex) { + select { + case ch <- newIndex: + case <-ctx.Done(): + return nil + } + newIndex = NewIndex() } - newIndex = NewIndex() } } + + err = newIndex.AddToSupersedes(extraObsolete...) + if err != nil { + return err + } + obsolete.Merge(restic.NewIDSet(extraObsolete...)) + + select { + case ch <- newIndex: + case <-ctx.Done(): + } + return nil + }) + + // a worker receives an index from ch, and saves the index + worker := func() error { + for idx := range ch { + idx.Finalize() + if _, err := SaveIndex(ctx, repo, idx); err != nil { + return err + } + } + return nil } - err = newIndex.AddToSupersedes(extraObsolete...) - if err != nil { - return nil, err - } - obsolete.Merge(restic.NewIDSet(extraObsolete...)) + // run workers on ch + wg.Go(func() error { + return RunWorkers(saveIndexParallelism, worker) + }) - newIndex.Finalize() - if _, err := SaveIndex(ctx, repo, newIndex); err != nil { - return nil, err - } + err = wg.Wait() - return + return obsolete, err }