From e2bb384a607ab442c22d3c5f3e0b45f6de74f335 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Charlotte=20=F0=9F=A6=9D=20Delenk?= Date: Tue, 14 Dec 2021 10:33:24 +0100 Subject: [PATCH] Parallelize blob upload/download for restic copy Currently restic copy will copy each blob from every snapshot serially, which has performance implications on high-latency backends such as b2. This commit introduces 8x parallelism for blob downloads/uploads which can improve restic copy operations up to 8x for repositories with many small blobs on b2. This commit also addresses the TODO comment in the copyTree function. Related work: A more thorough improvement of the restic copy performance can be found in PR #3513 --- changelog/unreleased/pull-3593 | 9 ++++++ cmd/restic/cmd_copy.go | 51 ++++++++++++++++++++++------------ 2 files changed, 42 insertions(+), 18 deletions(-) create mode 100644 changelog/unreleased/pull-3593 diff --git a/changelog/unreleased/pull-3593 b/changelog/unreleased/pull-3593 new file mode 100644 index 000000000..5b90561f2 --- /dev/null +++ b/changelog/unreleased/pull-3593 @@ -0,0 +1,9 @@ +Enhancement: Improve restic copy performance by parallelizing IO + +Restic copy previously only used a single thread for copying blobs between +repositories, which resulted in limited performance when copying small blobs +to/from a high latency backend (i.e. any remote backend, especially b2). +Copying will now use 8 parallel threads to increase the throughput of the copy +operation. + +https://github.com/restic/restic/pull/3593 diff --git a/cmd/restic/cmd_copy.go b/cmd/restic/cmd_copy.go index d16cd1742..16ca3180b 100644 --- a/cmd/restic/cmd_copy.go +++ b/cmd/restic/cmd_copy.go @@ -174,9 +174,12 @@ func similarSnapshots(sna *restic.Snapshot, snb *restic.Snapshot) bool { return true } +const numCopyWorkers = 8 + func copyTree(ctx context.Context, srcRepo restic.Repository, dstRepo restic.Repository, visitedTrees restic.IDSet, rootTreeID restic.ID) error { + idChan := make(chan restic.ID) wg, ctx := errgroup.WithContext(ctx) treeStream := restic.StreamTrees(ctx, wg, srcRepo, restic.IDs{rootTreeID}, func(treeID restic.ID) bool { @@ -186,9 +189,9 @@ func copyTree(ctx context.Context, srcRepo restic.Repository, dstRepo restic.Rep }, nil) wg.Go(func() error { + defer close(idChan) // reused buffer var buf []byte - for tree := range treeStream { if tree.Error != nil { return fmt.Errorf("LoadTree(%v) returned error %v", tree.ID.Str(), tree.Error) @@ -209,32 +212,44 @@ func copyTree(ctx context.Context, srcRepo restic.Repository, dstRepo restic.Rep } } - // TODO: parallelize blob down/upload - for _, entry := range tree.Nodes { // Recursion into directories is handled by StreamTrees // Copy the blobs for this file. for _, blobID := range entry.Content { - // Do we already have this data blob? - if dstRepo.Index().Has(restic.BlobHandle{ID: blobID, Type: restic.DataBlob}) { - continue - } - debug.Log("Copying blob %s\n", blobID.Str()) - var err error - buf, err = srcRepo.LoadBlob(ctx, restic.DataBlob, blobID, buf) - if err != nil { - return fmt.Errorf("LoadBlob(%v) returned error %v", blobID, err) - } - - _, _, err = dstRepo.SaveBlob(ctx, restic.DataBlob, buf, blobID, false) - if err != nil { - return fmt.Errorf("SaveBlob(%v) returned error %v", blobID, err) + select { + case idChan <- blobID: + case <-ctx.Done(): + return ctx.Err() } } } - } return nil }) + + for i := 0; i < numCopyWorkers; i++ { + wg.Go(func() error { + // reused buffer + var buf []byte + for blobID := range idChan { + // Do we already have this data blob? + if dstRepo.Index().Has(restic.BlobHandle{ID: blobID, Type: restic.DataBlob}) { + continue + } + debug.Log("Copying blob %s\n", blobID.Str()) + var err error + buf, err = srcRepo.LoadBlob(ctx, restic.DataBlob, blobID, buf) + if err != nil { + return fmt.Errorf("LoadBlob(%v) returned error %v", blobID, err) + } + + _, _, err = dstRepo.SaveBlob(ctx, restic.DataBlob, buf, blobID, false) + if err != nil { + return fmt.Errorf("SaveBlob(%v) returned error %v", blobID, err) + } + } + return nil + }) + } return wg.Wait() }