diff --git a/changelog/unreleased/pull-3593 b/changelog/unreleased/pull-3593 new file mode 100644 index 000000000..5b90561f2 --- /dev/null +++ b/changelog/unreleased/pull-3593 @@ -0,0 +1,9 @@ +Enhancement: Improve restic copy performance by parallelizing IO + +Restic copy previously only used a single thread for copying blobs between +repositories, which resulted in limited performance when copying small blobs +to/from a high latency backend (i.e. any remote backend, especially b2). +Copying will now use 8 parallel threads to increase the throughput of the copy +operation. + +https://github.com/restic/restic/pull/3593 diff --git a/cmd/restic/cmd_copy.go b/cmd/restic/cmd_copy.go index 030abc37d..4d7a192e0 100644 --- a/cmd/restic/cmd_copy.go +++ b/cmd/restic/cmd_copy.go @@ -176,9 +176,12 @@ func similarSnapshots(sna *restic.Snapshot, snb *restic.Snapshot) bool { return true } +const numCopyWorkers = 8 + func copyTree(ctx context.Context, srcRepo restic.Repository, dstRepo restic.Repository, visitedTrees restic.IDSet, rootTreeID restic.ID) error { + idChan := make(chan restic.ID) wg, ctx := errgroup.WithContext(ctx) treeStream := restic.StreamTrees(ctx, wg, srcRepo, restic.IDs{rootTreeID}, func(treeID restic.ID) bool { @@ -188,9 +191,9 @@ func copyTree(ctx context.Context, srcRepo restic.Repository, dstRepo restic.Rep }, nil) wg.Go(func() error { + defer close(idChan) // reused buffer var buf []byte - for tree := range treeStream { if tree.Error != nil { return fmt.Errorf("LoadTree(%v) returned error %v", tree.ID.Str(), tree.Error) @@ -211,32 +214,44 @@ func copyTree(ctx context.Context, srcRepo restic.Repository, dstRepo restic.Rep } } - // TODO: parallelize blob down/upload - for _, entry := range tree.Nodes { // Recursion into directories is handled by StreamTrees // Copy the blobs for this file. for _, blobID := range entry.Content { - // Do we already have this data blob? - if dstRepo.Index().Has(restic.BlobHandle{ID: blobID, Type: restic.DataBlob}) { - continue - } - debug.Log("Copying blob %s\n", blobID.Str()) - var err error - buf, err = srcRepo.LoadBlob(ctx, restic.DataBlob, blobID, buf) - if err != nil { - return fmt.Errorf("LoadBlob(%v) returned error %v", blobID, err) - } - - _, _, err = dstRepo.SaveBlob(ctx, restic.DataBlob, buf, blobID, false) - if err != nil { - return fmt.Errorf("SaveBlob(%v) returned error %v", blobID, err) + select { + case idChan <- blobID: + case <-ctx.Done(): + return ctx.Err() } } } - } return nil }) + + for i := 0; i < numCopyWorkers; i++ { + wg.Go(func() error { + // reused buffer + var buf []byte + for blobID := range idChan { + // Do we already have this data blob? + if dstRepo.Index().Has(restic.BlobHandle{ID: blobID, Type: restic.DataBlob}) { + continue + } + debug.Log("Copying blob %s\n", blobID.Str()) + var err error + buf, err = srcRepo.LoadBlob(ctx, restic.DataBlob, blobID, buf) + if err != nil { + return fmt.Errorf("LoadBlob(%v) returned error %v", blobID, err) + } + + _, _, err = dstRepo.SaveBlob(ctx, restic.DataBlob, buf, blobID, false) + if err != nil { + return fmt.Errorf("SaveBlob(%v) returned error %v", blobID, err) + } + } + return nil + }) + } return wg.Wait() }