From 203d775190b09a85a40bac19864e9f2ac58ab631 Mon Sep 17 00:00:00 2001 From: Alexander Neumann Date: Fri, 22 Mar 2019 20:30:29 +0100 Subject: [PATCH 1/6] restic: Make JSON unmarshal for ID more efficient This commit reduces several allocations in UnmarshalJSON() by decoding the hex string directly in a single step. --- internal/restic/id.go | 31 +++++++++++++++++++++++++----- internal/restic/id_test.go | 39 +++++++++++++++++++++++++++++++++++++- 2 files changed, 64 insertions(+), 6 deletions(-) diff --git a/internal/restic/id.go b/internal/restic/id.go index ffe818a83..bc9749e77 100644 --- a/internal/restic/id.go +++ b/internal/restic/id.go @@ -5,6 +5,7 @@ import ( "crypto/sha256" "encoding/hex" "encoding/json" + "fmt" "io" "github.com/restic/restic/internal/errors" @@ -101,13 +102,33 @@ func (id ID) MarshalJSON() ([]byte, error) { // UnmarshalJSON parses the JSON-encoded data and stores the result in id. func (id *ID) UnmarshalJSON(b []byte) error { - var s string - err := json.Unmarshal(b, &s) - if err != nil { - return errors.Wrap(err, "Unmarshal") + // check string length + if len(b) < 2 { + return fmt.Errorf("invalid ID: %q", b) } - _, err = hex.Decode(id[:], []byte(s)) + if len(b)%2 != 0 { + return fmt.Errorf("invalid ID length: %q", b) + } + + // check string delimiters + if b[0] != '"' && b[0] != '\'' { + return fmt.Errorf("invalid start of string: %q", b[0]) + } + + last := len(b) - 1 + if b[0] != b[last] { + return fmt.Errorf("starting string delimiter (%q) does not match end (%q)", b[0], b[last]) + } + + // strip JSON string delimiters + b = b[1:last] + + if len(b) != 2*len(id) { + return fmt.Errorf("invalid length for ID") + } + + _, err := hex.Decode(id[:], b) if err != nil { return errors.Wrap(err, "hex.Decode") } diff --git a/internal/restic/id_test.go b/internal/restic/id_test.go index 2e9634a19..ff1dc54e0 100644 --- a/internal/restic/id_test.go +++ b/internal/restic/id_test.go @@ -51,10 +51,47 @@ func TestID(t *testing.T) { var id3 ID err = id3.UnmarshalJSON(buf) if err != nil { - t.Fatal(err) + t.Fatalf("error for %q: %v", buf, err) } if !reflect.DeepEqual(id, id3) { t.Error("ids are not equal") } } } + +func TestIDUnmarshal(t *testing.T) { + var tests = []struct { + s string + valid bool + }{ + {`"`, false}, + {`""`, false}, + {`'`, false}, + {`"`, false}, + {`"c3ab8ff13720e8ad9047dd39466b3c8974e592c2fa383d4a3960714caef0c4"`, false}, + {`"c3ab8ff13720e8ad9047dd39466b3c8974e592c2fa383d4a3960714caef0c4f"`, false}, + {`"c3ab8ff13720e8ad9047dd39466b3c8974e592c2fa383d4a3960714caef0c4f2"`, true}, + } + + wantID, err := ParseID("c3ab8ff13720e8ad9047dd39466b3c8974e592c2fa383d4a3960714caef0c4f2") + if err != nil { + t.Fatal(err) + } + + for _, test := range tests { + t.Run("", func(t *testing.T) { + id := &ID{} + err := id.UnmarshalJSON([]byte(test.s)) + if test.valid && err != nil { + t.Fatal(err) + } + if !test.valid && err == nil { + t.Fatalf("want error for invalid value, got nil") + } + + if test.valid && !id.Equal(wantID) { + t.Fatalf("wrong ID returned, want %s, got %s", wantID, id) + } + }) + } +} From 75906edef5eaf673ae15b6846c197c861a61c2f0 Mon Sep 17 00:00:00 2001 From: Alexander Neumann Date: Sun, 24 Mar 2019 20:37:06 +0100 Subject: [PATCH 2/6] mock: Remove unused repository type --- internal/mock/repository.go | 141 ------------------------------------ 1 file changed, 141 deletions(-) delete mode 100644 internal/mock/repository.go diff --git a/internal/mock/repository.go b/internal/mock/repository.go deleted file mode 100644 index c3a9f0f9f..000000000 --- a/internal/mock/repository.go +++ /dev/null @@ -1,141 +0,0 @@ -package mock - -import ( - "github.com/restic/restic/internal/crypto" - "github.com/restic/restic/internal/restic" -) - -// Repository implements a mock Repository. -type Repository struct { - BackendFn func() restic.Backend - - KeyFn func() *crypto.Key - - SetIndexFn func(restic.Index) error - - IndexFn func() restic.Index - SaveFullIndexFn func() error - SaveIndexFn func() error - LoadIndexFn func() error - - ConfigFn func() restic.Config - - LookupBlobSizeFn func(restic.ID, restic.BlobType) (uint, error) - - ListFn func(restic.FileType, <-chan struct{}) <-chan restic.ID - ListPackFn func(restic.ID) ([]restic.Blob, int64, error) - - FlushFn func() error - - SaveUnpackedFn func(restic.FileType, []byte) (restic.ID, error) - SaveJSONUnpackedFn func(restic.FileType, interface{}) (restic.ID, error) - - LoadJSONUnpackedFn func(restic.FileType, restic.ID, interface{}) error - LoadAndDecryptFn func(restic.FileType, restic.ID) ([]byte, error) - - LoadBlobFn func(restic.BlobType, restic.ID, []byte) (int, error) - SaveBlobFn func(restic.BlobType, []byte, restic.ID) (restic.ID, error) - - LoadTreeFn func(restic.ID) (*restic.Tree, error) - SaveTreeFn func(t *restic.Tree) (restic.ID, error) -} - -// Backend is a stub method. -func (repo Repository) Backend() restic.Backend { - return repo.BackendFn() -} - -// Key is a stub method. -func (repo Repository) Key() *crypto.Key { - return repo.KeyFn() -} - -// SetIndex is a stub method. -func (repo Repository) SetIndex(idx restic.Index) error { - return repo.SetIndexFn(idx) -} - -// Index is a stub method. -func (repo Repository) Index() restic.Index { - return repo.IndexFn() -} - -// SaveFullIndex is a stub method. -func (repo Repository) SaveFullIndex() error { - return repo.SaveFullIndexFn() -} - -// SaveIndex is a stub method. -func (repo Repository) SaveIndex() error { - return repo.SaveIndexFn() -} - -// LoadIndex is a stub method. -func (repo Repository) LoadIndex() error { - return repo.LoadIndexFn() -} - -// Config is a stub method. -func (repo Repository) Config() restic.Config { - return repo.ConfigFn() -} - -// LookupBlobSize is a stub method. -func (repo Repository) LookupBlobSize(id restic.ID, t restic.BlobType) (uint, error) { - return repo.LookupBlobSizeFn(id, t) -} - -// List is a stub method. -func (repo Repository) List(t restic.FileType, done <-chan struct{}) <-chan restic.ID { - return repo.ListFn(t, done) -} - -// ListPack is a stub method. -func (repo Repository) ListPack(id restic.ID) ([]restic.Blob, int64, error) { - return repo.ListPackFn(id) -} - -// Flush is a stub method. -func (repo Repository) Flush() error { - return repo.FlushFn() -} - -// SaveUnpacked is a stub method. -func (repo Repository) SaveUnpacked(t restic.FileType, buf []byte) (restic.ID, error) { - return repo.SaveUnpackedFn(t, buf) -} - -// SaveJSONUnpacked is a stub method. -func (repo Repository) SaveJSONUnpacked(t restic.FileType, item interface{}) (restic.ID, error) { - return repo.SaveJSONUnpackedFn(t, item) -} - -// LoadJSONUnpacked is a stub method. -func (repo Repository) LoadJSONUnpacked(t restic.FileType, id restic.ID, item interface{}) error { - return repo.LoadJSONUnpackedFn(t, id, item) -} - -// LoadAndDecrypt is a stub method. -func (repo Repository) LoadAndDecrypt(t restic.FileType, id restic.ID) ([]byte, error) { - return repo.LoadAndDecryptFn(t, id) -} - -// LoadBlob is a stub method. -func (repo Repository) LoadBlob(t restic.BlobType, id restic.ID, buf []byte) (int, error) { - return repo.LoadBlobFn(t, id, buf) -} - -// SaveBlob is a stub method. -func (repo Repository) SaveBlob(t restic.BlobType, buf []byte, id restic.ID) (restic.ID, error) { - return repo.SaveBlobFn(t, buf, id) -} - -// LoadTree is a stub method. -func (repo Repository) LoadTree(id restic.ID) (*restic.Tree, error) { - return repo.LoadTreeFn(id) -} - -// SaveTree is a stub method. -func (repo Repository) SaveTree(t *restic.Tree) (restic.ID, error) { - return repo.SaveTreeFn(t) -} From e046428c9499699e7772f5b9d9bf3b1f21cb3f3d Mon Sep 17 00:00:00 2001 From: Alexander Neumann Date: Sun, 24 Mar 2019 21:27:28 +0100 Subject: [PATCH 3/6] Replace FilesInParallel with an errgroup.Group --- internal/checker/checker.go | 208 ++++++++++++++++----------- internal/repository/parallel.go | 65 --------- internal/repository/parallel_test.go | 129 ----------------- internal/repository/repository.go | 90 ++++++++---- internal/repository/worker_group.go | 35 +++++ 5 files changed, 227 insertions(+), 300 deletions(-) delete mode 100644 internal/repository/parallel.go delete mode 100644 internal/repository/parallel_test.go create mode 100644 internal/repository/worker_group.go diff --git a/internal/checker/checker.go b/internal/checker/checker.go index 7255e990d..79b3cb4b9 100644 --- a/internal/checker/checker.go +++ b/internal/checker/checker.go @@ -74,82 +74,107 @@ func (err ErrOldIndexFormat) Error() string { // LoadIndex loads all index files. func (c *Checker) LoadIndex(ctx context.Context) (hints []error, errs []error) { debug.Log("Start") - type indexRes struct { - Index *repository.Index - err error - ID string + + // track spawned goroutines using wg, create a new context which is + // cancelled as soon as an error occurs. + wg, ctx := errgroup.WithContext(ctx) + + type FileInfo struct { + restic.ID + Size int64 } - indexCh := make(chan indexRes) + type Result struct { + *repository.Index + restic.ID + Err error + } - worker := func(ctx context.Context, id restic.ID) error { - debug.Log("worker got index %v", id) - idx, err := repository.LoadIndexWithDecoder(ctx, c.repo, id, repository.DecodeIndex) - if errors.Cause(err) == repository.ErrOldIndexFormat { - debug.Log("index %v has old format", id) - hints = append(hints, ErrOldIndexFormat{id}) + ch := make(chan FileInfo) + resultCh := make(chan Result) - idx, err = repository.LoadIndexWithDecoder(ctx, c.repo, id, repository.DecodeOldIndex) + // send list of index files through ch, which is closed afterwards + wg.Go(func() error { + defer close(ch) + return c.repo.List(ctx, restic.IndexFile, func(id restic.ID, size int64) error { + select { + case <-ctx.Done(): + return nil + case ch <- FileInfo{id, size}: + } + return nil + }) + }) + + // a worker receives an index ID from ch, loads the index, and sends it to indexCh + worker := func() error { + for fi := range ch { + debug.Log("worker got file %v", fi.ID.Str()) + idx, err := repository.LoadIndexWithDecoder(ctx, c.repo, fi.ID, repository.DecodeIndex) + if errors.Cause(err) == repository.ErrOldIndexFormat { + debug.Log("index %v has old format", fi.ID.Str()) + hints = append(hints, ErrOldIndexFormat{fi.ID}) + + idx, err = repository.LoadIndexWithDecoder(ctx, c.repo, fi.ID, repository.DecodeOldIndex) + } + + err = errors.Wrapf(err, "error loading index %v", fi.ID.Str()) + + select { + case resultCh <- Result{idx, fi.ID, err}: + case <-ctx.Done(): + } } - - err = errors.Wrapf(err, "error loading index %v", id.Str()) - - select { - case indexCh <- indexRes{Index: idx, ID: id.String(), err: err}: - case <-ctx.Done(): - } - return nil } - go func() { - defer close(indexCh) - debug.Log("start loading indexes in parallel") - err := repository.FilesInParallel(ctx, c.repo.Backend(), restic.IndexFile, defaultParallelism, - repository.ParallelWorkFuncParseID(worker)) - debug.Log("loading indexes finished, error: %v", err) - if err != nil { - panic(err) - } - }() + // final closes indexCh after all workers have terminated + final := func() error { + close(resultCh) + return nil + } - done := make(chan struct{}) - defer close(done) + // run workers on ch + wg.Go(func() error { + return repository.RunWorkers(ctx, defaultParallelism, worker, final) + }) + // receive decoded indexes packToIndex := make(map[restic.ID]restic.IDSet) + wg.Go(func() error { + for res := range resultCh { + debug.Log("process index %v, err %v", res.ID, res.Err) - for res := range indexCh { - debug.Log("process index %v, err %v", res.ID, res.err) - - if res.err != nil { - errs = append(errs, res.err) - continue - } - - idxID, err := restic.ParseID(res.ID) - if err != nil { - errs = append(errs, errors.Errorf("unable to parse as index ID: %v", res.ID)) - continue - } - - c.indexes[idxID] = res.Index - c.masterIndex.Insert(res.Index) - - debug.Log("process blobs") - cnt := 0 - for blob := range res.Index.Each(ctx) { - c.packs.Insert(blob.PackID) - c.blobs.Insert(blob.ID) - c.blobRefs.M[blob.ID] = 0 - cnt++ - - if _, ok := packToIndex[blob.PackID]; !ok { - packToIndex[blob.PackID] = restic.NewIDSet() + if res.Err != nil { + errs = append(errs, res.Err) + continue } - packToIndex[blob.PackID].Insert(idxID) - } - debug.Log("%d blobs processed", cnt) + c.indexes[res.ID] = res.Index + c.masterIndex.Insert(res.Index) + + debug.Log("process blobs") + cnt := 0 + for blob := range res.Index.Each(ctx) { + c.packs.Insert(blob.PackID) + c.blobs.Insert(blob.ID) + c.blobRefs.M[blob.ID] = 0 + cnt++ + + if _, ok := packToIndex[blob.PackID]; !ok { + packToIndex[blob.PackID] = restic.NewIDSet() + } + packToIndex[blob.PackID].Insert(res.ID) + } + + debug.Log("%d blobs processed", cnt) + } + return nil + }) + + err := wg.Wait() + if err != nil { + errs = append(errs, err) } debug.Log("checking for duplicate packs") @@ -163,7 +188,7 @@ func (c *Checker) LoadIndex(ctx context.Context) (hints []error, errs []error) { } } - err := c.repo.SetIndex(c.masterIndex) + err = c.repo.SetIndex(c.masterIndex) if err != nil { debug.Log("SetIndex returned error: %v", err) errs = append(errs, err) @@ -281,31 +306,52 @@ func loadSnapshotTreeIDs(ctx context.Context, repo restic.Repository) (restic.ID sync.Mutex } - snapshotWorker := func(ctx context.Context, strID string) error { - id, err := restic.ParseID(strID) - if err != nil { - return err - } + // track spawned goroutines using wg, create a new context which is + // cancelled as soon as an error occurs. + wg, ctx := errgroup.WithContext(ctx) - debug.Log("load snapshot %v", id) + ch := make(chan restic.ID) - treeID, err := loadTreeFromSnapshot(ctx, repo, id) - if err != nil { - errs.Lock() - errs.errs = append(errs.errs, err) - errs.Unlock() + // send list of index files through ch, which is closed afterwards + wg.Go(func() error { + defer close(ch) + return repo.List(ctx, restic.SnapshotFile, func(id restic.ID, size int64) error { + select { + case <-ctx.Done(): + return nil + case ch <- id: + } return nil + }) + }) + + // a worker receives an index ID from ch, loads the snapshot and the tree, + // and adds the result to errs and trees. + worker := func() error { + for id := range ch { + debug.Log("load snapshot %v", id) + + treeID, err := loadTreeFromSnapshot(ctx, repo, id) + if err != nil { + errs.Lock() + errs.errs = append(errs.errs, err) + errs.Unlock() + continue + } + + debug.Log("snapshot %v has tree %v", id, treeID) + trees.Lock() + trees.IDs = append(trees.IDs, treeID) + trees.Unlock() } - - debug.Log("snapshot %v has tree %v", id, treeID) - trees.Lock() - trees.IDs = append(trees.IDs, treeID) - trees.Unlock() - return nil } - err := repository.FilesInParallel(ctx, repo.Backend(), restic.SnapshotFile, defaultParallelism, snapshotWorker) + for i := 0; i < defaultParallelism; i++ { + wg.Go(worker) + } + + err := wg.Wait() if err != nil { errs.errs = append(errs.errs, err) } diff --git a/internal/repository/parallel.go b/internal/repository/parallel.go deleted file mode 100644 index 154b58bfa..000000000 --- a/internal/repository/parallel.go +++ /dev/null @@ -1,65 +0,0 @@ -package repository - -import ( - "context" - - "github.com/restic/restic/internal/debug" - "github.com/restic/restic/internal/restic" - "golang.org/x/sync/errgroup" -) - -// ParallelWorkFunc gets one file ID to work on. If an error is returned, -// processing stops. When the contect is cancelled the function should return. -type ParallelWorkFunc func(ctx context.Context, id string) error - -// ParallelIDWorkFunc gets one restic.ID to work on. If an error is returned, -// processing stops. When the context is cancelled the function should return. -type ParallelIDWorkFunc func(ctx context.Context, id restic.ID) error - -// FilesInParallel runs n workers of f in parallel, on the IDs that -// repo.List(t) yields. If f returns an error, the process is aborted and the -// first error is returned. -func FilesInParallel(ctx context.Context, repo restic.Lister, t restic.FileType, n int, f ParallelWorkFunc) error { - g, ctx := errgroup.WithContext(ctx) - - ch := make(chan string, n) - g.Go(func() error { - defer close(ch) - return repo.List(ctx, t, func(fi restic.FileInfo) error { - select { - case <-ctx.Done(): - case ch <- fi.Name: - } - return nil - }) - }) - - for i := 0; i < n; i++ { - g.Go(func() error { - for name := range ch { - err := f(ctx, name) - if err != nil { - return err - } - } - return nil - }) - } - - return g.Wait() -} - -// ParallelWorkFuncParseID converts a function that takes a restic.ID to a -// function that takes a string. Filenames that do not parse as a restic.ID -// are ignored. -func ParallelWorkFuncParseID(f ParallelIDWorkFunc) ParallelWorkFunc { - return func(ctx context.Context, s string) error { - id, err := restic.ParseID(s) - if err != nil { - debug.Log("invalid ID %q: %v", id, err) - return nil - } - - return f(ctx, id) - } -} diff --git a/internal/repository/parallel_test.go b/internal/repository/parallel_test.go deleted file mode 100644 index 7b4c4a583..000000000 --- a/internal/repository/parallel_test.go +++ /dev/null @@ -1,129 +0,0 @@ -package repository_test - -import ( - "context" - "math/rand" - "testing" - "time" - - "github.com/restic/restic/internal/errors" - "github.com/restic/restic/internal/restic" - - "github.com/restic/restic/internal/repository" - rtest "github.com/restic/restic/internal/test" -) - -type testIDs []string - -var lister = testIDs{ - "40bb581cd36de952985c97a3ff6b21df41ee897d4db2040354caa36a17ff5268", - "2e15811a4d14ffac66d36a9ff456019d8de4c10c949d45b643f8477d17e92ff3", - "70c11b3ed521ad6b76d905c002ca98b361fca06aca060a063432c7311155a4da", - "8056a33e75dccdda701b6c989c7ed0cb71bbb6da13c6427fe5986f0896cc91c0", - "79d8776200596aa0237b10d470f7b850b86f8a1a80988ef5c8bee2874ce992e2", - "f9f1f29791c6b79b90b35efd083f17a3b163bbbafb1a2fdf43d46d56cffda289", - "3834178d05d0f6dd07f872ee0262ff1ace0f0f375768227d3c902b0b66591369", - "66d5cc68c9186414806f366ae5493ce7f229212993750a4992be4030f6af28c5", - "ebca5af4f397944f68cd215e3dfa2b197a7ba0f7c17d65d9f7390d0a15cde296", - "d4511ce6ff732d106275a57e40745c599e987c0da44c42cddbef592aac102437", - "f366202f0bfeefaedd7b49e2f21a90d3cbddb97d257a74d788dd34e19a684dae", - "a5c17728ab2433cd50636dd5c6c7068c7a44f2999d09c46e8f528466da8a059d", - "bae0f9492b9b208233029b87692a1a55cbd7fbe1cf3f6d7bc693ac266a6d6f0e", - "9d500187913c7510d71d1902703d312c7aaa56f1e98351385b9535fdabae595e", - "ffbddd8a4c1e54d258bb3e16d3929b546b61af63cb560b3e3061a8bef5b24552", - "201bb3abf655e7ef71e79ed4fb1079b0502b5acb4d9fad5e72a0de690c50a386", - "08eb57bbd559758ea96e99f9b7688c30e7b3bcf0c4562ff4535e2d8edeffaeed", - "e50b7223b04985ff38d9e11d1cba333896ef4264f82bd5d0653a028bce70e542", - "65a9421cd59cc7b7a71dcd9076136621af607fb4701d2e5c2af23b6396cf2f37", - "995a655b3521c19b4d0c266222266d89c8fc62889597d61f45f336091e646d57", - "51ec6f0bce77ed97df2dd7ae849338c3a8155a057da927eedd66e3d61be769ad", - "7b3923a0c0666431efecdbf6cb171295ec1710b6595eebcba3b576b49d13e214", - "2cedcc3d14698bea7e4b0546f7d5d48951dd90add59e6f2d44b693fd8913717d", - "fd6770cbd54858fdbd3d7b4239b985e5599180064d93ca873f27e86e8407d011", - "9edc51d8e6e04d05c9757848c1bfbfdc8e86b6330982294632488922e59fdb1b", - "1a6c4fbb24ad724c968b2020417c3d057e6c89e49bdfb11d91006def65eab6a0", - "cb3b29808cd0adfa2dca1f3a04f98114fbccf4eb487cdd4022f49bd70eeb049b", - "f55edcb40c619e29a20e432f8aaddc83a649be2c2d1941ccdc474cd2af03d490", - "e8ccc1763a92de23566b95c3ad1414a098016ece69a885fc8a72782a7517d17c", - "0fe2e3db8c5a12ad7101a63a0fffee901be54319cfe146bead7aec851722f82d", - "36be45a6ae7c95ad97cee1b33023be324bce7a7b4b7036e24125679dd9ff5b44", - "1685ed1a57c37859fbef1f7efb7509f20b84ec17a765605de43104d2fa37884b", - "9d83629a6a004c505b100a0b5d0b246833b63aa067aa9b59e3abd6b74bc4d3a8", - "be49a66b60175c5e2ee273b42165f86ef11bb6518c1c79950bcd3f4c196c98bd", - "0fd89885d821761b4a890782908e75793028747d15ace3c6cbf0ad56582b4fa5", - "94a767519a4e352a88796604943841fea21429f3358b4d5d55596dbda7d15dce", - "8dd07994afe6e572ddc9698fb0d13a0d4c26a38b7992818a71a99d1e0ac2b034", - "f7380a6f795ed31fbeb2945c72c5fd1d45044e5ab152311e75e007fa530f5847", - "5ca1ce01458e484393d7e9c8af42b0ff37a73a2fee0f18e14cff0fb180e33014", - "8f44178be3fe0a2bd41f922576fb7a9b19d589754504be746f56c759df328fda", - "12d33847c2be711c989f37360dd7aa8537fd14972262a4530634a08fdf32a767", - "31e077f5080f78846a00093caff2b6b839519cc47516142eeba9c41d4072a605", - "14f01db8a0054e70222b76d2555d70114b4bf8a0f02084324af2df226f14a795", - "7f5dbbaf31b4551828e8e76cef408375db9fbcdcdb6b5949f2d1b0c4b8632132", - "42a5d9b9bb7e4a16f23ba916bcf87f38c1aa1f2de2ab79736f725850a8ff6a1b", - "e06f8f901ea708beba8712a11b6e2d0be7c4b018d0254204ef269bcdf5e8c6cc", - "d9ba75785bf45b0c4fd3b2365c968099242483f2f0d0c7c20306dac11fae96e9", - "428debbb280873907cef2ec099efe1566e42a59775d6ec74ded0c4048d5a6515", - "3b51049d4dae701098e55a69536fa31ad2be1adc17b631a695a40e8a294fe9c0", - "168f88aa4b105e9811f5f79439cc1a689be4eec77f3361d42f22fe8f7ddc74a9", - "0baa0ab2249b33d64449a899cb7bd8eae5231f0d4ff70f09830dc1faa2e4abee", - "0c3896d346b580306a49de29f3a78913a41e14b8461b124628c33a64636241f2", - "b18313f1651c15e100e7179aa3eb8ffa62c3581159eaf7f83156468d19781e42", - "996361f7d988e48267ccc7e930fed4637be35fe7562b8601dceb7a32313a14c8", - "dfb4e6268437d53048d22b811048cd045df15693fc6789affd002a0fc80a6e60", - "34dd044c228727f2226a0c9c06a3e5ceb5e30e31cb7854f8fa1cde846b395a58", -} - -func (tests testIDs) List(ctx context.Context, t restic.FileType, fn func(restic.FileInfo) error) error { - for i := 0; i < 500; i++ { - for _, id := range tests { - if ctx.Err() != nil { - return ctx.Err() - } - - fi := restic.FileInfo{ - Name: id, - } - - err := fn(fi) - if err != nil { - return err - } - } - } - - return nil -} - -func TestFilesInParallel(t *testing.T) { - f := func(ctx context.Context, id string) error { - time.Sleep(1 * time.Millisecond) - return nil - } - - for n := 1; n < 5; n++ { - err := repository.FilesInParallel(context.TODO(), lister, restic.DataFile, n*100, f) - rtest.OK(t, err) - } -} - -var errTest = errors.New("test error") - -func TestFilesInParallelWithError(t *testing.T) { - f := func(ctx context.Context, id string) error { - time.Sleep(1 * time.Millisecond) - - if rand.Float32() < 0.01 { - return errTest - } - - return nil - } - - for n := 1; n < 5; n++ { - err := repository.FilesInParallel(context.TODO(), lister, restic.DataFile, n*100, f) - if err != errTest { - t.Fatalf("wrong error returned, want %q, got %v", errTest, err) - } - } -} diff --git a/internal/repository/repository.go b/internal/repository/repository.go index 1a6e5c505..880006cf5 100644 --- a/internal/repository/repository.go +++ b/internal/repository/repository.go @@ -18,6 +18,7 @@ import ( "github.com/restic/restic/internal/hashing" "github.com/restic/restic/internal/pack" "github.com/restic/restic/internal/restic" + "golang.org/x/sync/errgroup" ) // Repository is used to access a repository in a backend. @@ -391,45 +392,84 @@ const loadIndexParallelism = 4 func (r *Repository) LoadIndex(ctx context.Context) error { debug.Log("Loading index") - errCh := make(chan error, 1) - indexes := make(chan *Index) + // track spawned goroutines using wg, create a new context which is + // cancelled as soon as an error occurs. + wg, ctx := errgroup.WithContext(ctx) - worker := func(ctx context.Context, id restic.ID) error { - idx, err := LoadIndex(ctx, r, id) - if err != nil { - fmt.Fprintf(os.Stderr, "%v, ignoring\n", err) + type FileInfo struct { + restic.ID + Size int64 + } + ch := make(chan FileInfo) + indexCh := make(chan *Index) + + // send list of index files through ch, which is closed afterwards + wg.Go(func() error { + defer close(ch) + return r.List(ctx, restic.IndexFile, func(id restic.ID, size int64) error { + select { + case <-ctx.Done(): + return nil + case ch <- FileInfo{id, size}: + } return nil - } + }) + }) - select { - case indexes <- idx: - case <-ctx.Done(): + // a worker receives an index ID from ch, loads the index, and sends it to indexCh + worker := func() error { + for fi := range ch { + idx, err := LoadIndex(ctx, r, fi.ID) + if err != nil { + fmt.Fprintf(os.Stderr, "%v, ignoring\n", err) + return nil + } + + select { + case indexCh <- idx: + case <-ctx.Done(): + } } return nil } - go func() { - defer close(indexes) - errCh <- FilesInParallel(ctx, r.be, restic.IndexFile, loadIndexParallelism, - ParallelWorkFuncParseID(worker)) - }() - - validIndex := restic.NewIDSet() - for idx := range indexes { - id, err := idx.ID() - if err == nil { - validIndex.Insert(id) - } - r.idx.Insert(idx) + // final closes indexCh after all workers have terminated + final := func() error { + close(indexCh) + return nil } - err := r.PrepareCache(validIndex) + // run workers on ch + wg.Go(func() error { + return RunWorkers(ctx, loadIndexParallelism, worker, final) + }) + + // receive decoded indexes + validIndex := restic.NewIDSet() + wg.Go(func() error { + for idx := range indexCh { + id, err := idx.ID() + if err == nil { + validIndex.Insert(id) + } + r.idx.Insert(idx) + } + return nil + }) + + err := wg.Wait() if err != nil { return err } - return <-errCh + // remove index files from the cache which have been removed in the repo + err = r.PrepareCache(validIndex) + if err != nil { + return err + } + + return nil } // PrepareCache initializes the local cache. indexIDs is the list of IDs of diff --git a/internal/repository/worker_group.go b/internal/repository/worker_group.go new file mode 100644 index 000000000..ab09d441f --- /dev/null +++ b/internal/repository/worker_group.go @@ -0,0 +1,35 @@ +package repository + +import ( + "context" + + "golang.org/x/sync/errgroup" +) + +// RunWorkers runs count instances of workerFunc using an errgroup.Group. +// After all workers have terminated, finalFunc is run. If an error occurs in +// one of the workers, it is returned. FinalFunc is always run, regardless of +// any other previous errors. +func RunWorkers(ctx context.Context, count int, workerFunc, finalFunc func() error) error { + wg, ctx := errgroup.WithContext(ctx) + + // run workers + for i := 0; i < count; i++ { + wg.Go(workerFunc) + } + + // wait for termination + err := wg.Wait() + + // make sure finalFunc is run + finalErr := finalFunc() + + // if the workers returned an error, return it to the caller (disregarding + // any error from finalFunc) + if err != nil { + return err + } + + // if not, return the value finalFunc returned + return finalErr +} From d51e9d1b98e3984cbfcf2ca1b6ce4b3ee2a18d80 Mon Sep 17 00:00:00 2001 From: Alexander Neumann Date: Sun, 24 Mar 2019 21:59:14 +0100 Subject: [PATCH 4/6] Add []byte to repo.LoadAndDecrypt and utils.LoadAll This commit changes the signatures for repository.LoadAndDecrypt and utils.LoadAll to allow passing in a []byte as the buffer to use. This buffer is enlarged as needed, and returned back to the caller for further use. In later commits, this allows reducing allocations by reusing a buffer for multiple calls, e.g. in a worker function. --- cmd/restic/cmd_cat.go | 6 +- internal/backend/test/tests.go | 10 +-- internal/backend/utils.go | 27 ++++++-- internal/backend/utils_test.go | 96 +++++++++++++++----------- internal/cache/backend_test.go | 4 +- internal/repository/index.go | 2 +- internal/repository/key.go | 2 +- internal/repository/repository.go | 27 ++++++-- internal/repository/repository_test.go | 2 +- internal/restic/repository.go | 7 +- 10 files changed, 112 insertions(+), 71 deletions(-) diff --git a/cmd/restic/cmd_cat.go b/cmd/restic/cmd_cat.go index e735daf88..0dffc0508 100644 --- a/cmd/restic/cmd_cat.go +++ b/cmd/restic/cmd_cat.go @@ -74,7 +74,7 @@ func runCat(gopts GlobalOptions, args []string) error { fmt.Println(string(buf)) return nil case "index": - buf, err := repo.LoadAndDecrypt(gopts.ctx, restic.IndexFile, id) + buf, err := repo.LoadAndDecrypt(gopts.ctx, nil, restic.IndexFile, id) if err != nil { return err } @@ -99,7 +99,7 @@ func runCat(gopts GlobalOptions, args []string) error { return nil case "key": h := restic.Handle{Type: restic.KeyFile, Name: id.String()} - buf, err := backend.LoadAll(gopts.ctx, repo.Backend(), h) + buf, err := backend.LoadAll(gopts.ctx, nil, repo.Backend(), h) if err != nil { return err } @@ -150,7 +150,7 @@ func runCat(gopts GlobalOptions, args []string) error { switch tpe { case "pack": h := restic.Handle{Type: restic.DataFile, Name: id.String()} - buf, err := backend.LoadAll(gopts.ctx, repo.Backend(), h) + buf, err := backend.LoadAll(gopts.ctx, nil, repo.Backend(), h) if err != nil { return err } diff --git a/internal/backend/test/tests.go b/internal/backend/test/tests.go index dec1e0bee..7e9f7f5ab 100644 --- a/internal/backend/test/tests.go +++ b/internal/backend/test/tests.go @@ -79,7 +79,7 @@ func (s *Suite) TestConfig(t *testing.T) { var testString = "Config" // create config and read it back - _, err := backend.LoadAll(context.TODO(), b, restic.Handle{Type: restic.ConfigFile}) + _, err := backend.LoadAll(context.TODO(), nil, b, restic.Handle{Type: restic.ConfigFile}) if err == nil { t.Fatalf("did not get expected error for non-existing config") } @@ -93,7 +93,7 @@ func (s *Suite) TestConfig(t *testing.T) { // same config for _, name := range []string{"", "foo", "bar", "0000000000000000000000000000000000000000000000000000000000000000"} { h := restic.Handle{Type: restic.ConfigFile, Name: name} - buf, err := backend.LoadAll(context.TODO(), b, h) + buf, err := backend.LoadAll(context.TODO(), nil, b, h) if err != nil { t.Fatalf("unable to read config with name %q: %+v", name, err) } @@ -491,7 +491,7 @@ func (s *Suite) TestSave(t *testing.T) { err := b.Save(context.TODO(), h, restic.NewByteReader(data)) test.OK(t, err) - buf, err := backend.LoadAll(context.TODO(), b, h) + buf, err := backend.LoadAll(context.TODO(), nil, b, h) test.OK(t, err) if len(buf) != len(data) { t.Fatalf("number of bytes does not match, want %v, got %v", len(data), len(buf)) @@ -584,7 +584,7 @@ func (s *Suite) TestSaveFilenames(t *testing.T) { continue } - buf, err := backend.LoadAll(context.TODO(), b, h) + buf, err := backend.LoadAll(context.TODO(), nil, b, h) if err != nil { t.Errorf("test %d failed: Load() returned %+v", i, err) continue @@ -734,7 +734,7 @@ func (s *Suite) TestBackend(t *testing.T) { // test Load() h := restic.Handle{Type: tpe, Name: ts.id} - buf, err := backend.LoadAll(context.TODO(), b, h) + buf, err := backend.LoadAll(context.TODO(), nil, b, h) test.OK(t, err) test.Equals(t, ts.data, string(buf)) diff --git a/internal/backend/utils.go b/internal/backend/utils.go index 222f210e5..1665aedc6 100644 --- a/internal/backend/utils.go +++ b/internal/backend/utils.go @@ -1,20 +1,33 @@ package backend import ( + "bytes" "context" "io" - "io/ioutil" "github.com/restic/restic/internal/restic" ) -// LoadAll reads all data stored in the backend for the handle. -func LoadAll(ctx context.Context, be restic.Backend, h restic.Handle) (buf []byte, err error) { - err = be.Load(ctx, h, 0, 0, func(rd io.Reader) (ierr error) { - buf, ierr = ioutil.ReadAll(rd) - return ierr +// LoadAll reads all data stored in the backend for the handle into the given +// buffer, which is truncated. If the buffer is not large enough or nil, a new +// one is allocated. +func LoadAll(ctx context.Context, buf []byte, be restic.Backend, h restic.Handle) ([]byte, error) { + err := be.Load(ctx, h, 0, 0, func(rd io.Reader) error { + // make sure this is idempotent, in case an error occurs this function may be called multiple times! + wr := bytes.NewBuffer(buf[:0]) + _, cerr := io.Copy(wr, rd) + if cerr != nil { + return cerr + } + buf = wr.Bytes() + return nil }) - return buf, err + + if err != nil { + return nil, err + } + + return buf, nil } // LimitedReadCloser wraps io.LimitedReader and exposes the Close() method. diff --git a/internal/backend/utils_test.go b/internal/backend/utils_test.go index 74929fd0b..a29add676 100644 --- a/internal/backend/utils_test.go +++ b/internal/backend/utils_test.go @@ -19,6 +19,7 @@ const MiB = 1 << 20 func TestLoadAll(t *testing.T) { b := mem.New() + var buf []byte for i := 0; i < 20; i++ { data := rtest.Random(23+i, rand.Intn(MiB)+500*KiB) @@ -28,7 +29,7 @@ func TestLoadAll(t *testing.T) { err := b.Save(context.TODO(), h, restic.NewByteReader(data)) rtest.OK(t, err) - buf, err := backend.LoadAll(context.TODO(), b, restic.Handle{Type: restic.DataFile, Name: id.String()}) + buf, err := backend.LoadAll(context.TODO(), buf, b, restic.Handle{Type: restic.DataFile, Name: id.String()}) rtest.OK(t, err) if len(buf) != len(data) { @@ -43,55 +44,66 @@ func TestLoadAll(t *testing.T) { } } -func TestLoadSmallBuffer(t *testing.T) { - b := mem.New() - - for i := 0; i < 20; i++ { - data := rtest.Random(23+i, rand.Intn(MiB)+500*KiB) - - id := restic.Hash(data) - h := restic.Handle{Name: id.String(), Type: restic.DataFile} - err := b.Save(context.TODO(), h, restic.NewByteReader(data)) - rtest.OK(t, err) - - buf, err := backend.LoadAll(context.TODO(), b, restic.Handle{Type: restic.DataFile, Name: id.String()}) - rtest.OK(t, err) - - if len(buf) != len(data) { - t.Errorf("length of returned buffer does not match, want %d, got %d", len(data), len(buf)) - continue - } - - if !bytes.Equal(buf, data) { - t.Errorf("wrong data returned") - continue - } +func save(t testing.TB, be restic.Backend, buf []byte) restic.Handle { + id := restic.Hash(buf) + h := restic.Handle{Name: id.String(), Type: restic.DataFile} + err := be.Save(context.TODO(), h, restic.NewByteReader(buf)) + if err != nil { + t.Fatal(err) } + return h } -func TestLoadLargeBuffer(t *testing.T) { +func TestLoadAllAppend(t *testing.T) { b := mem.New() - for i := 0; i < 20; i++ { - data := rtest.Random(23+i, rand.Intn(MiB)+500*KiB) + h1 := save(t, b, []byte("foobar test string")) + randomData := rtest.Random(23, rand.Intn(MiB)+500*KiB) + h2 := save(t, b, randomData) - id := restic.Hash(data) - h := restic.Handle{Name: id.String(), Type: restic.DataFile} - err := b.Save(context.TODO(), h, restic.NewByteReader(data)) - rtest.OK(t, err) + var tests = []struct { + handle restic.Handle + buf []byte + want []byte + }{ + { + handle: h1, + buf: nil, + want: []byte("foobar test string"), + }, + { + handle: h1, + buf: []byte("xxx"), + want: []byte("foobar test string"), + }, + { + handle: h2, + buf: nil, + want: randomData, + }, + { + handle: h2, + buf: make([]byte, 0, 200), + want: randomData, + }, + { + handle: h2, + buf: []byte("foobarbaz"), + want: randomData, + }, + } - buf, err := backend.LoadAll(context.TODO(), b, restic.Handle{Type: restic.DataFile, Name: id.String()}) - rtest.OK(t, err) + for _, test := range tests { + t.Run("", func(t *testing.T) { + buf, err := backend.LoadAll(context.TODO(), test.buf, b, test.handle) + if err != nil { + t.Fatal(err) + } - if len(buf) != len(data) { - t.Errorf("length of returned buffer does not match, want %d, got %d", len(data), len(buf)) - continue - } - - if !bytes.Equal(buf, data) { - t.Errorf("wrong data returned") - continue - } + if !bytes.Equal(buf, test.want) { + t.Errorf("wrong data returned, want %q, got %q", test.want, buf) + } + }) } } diff --git a/internal/cache/backend_test.go b/internal/cache/backend_test.go index b4cc431ac..872ddbde1 100644 --- a/internal/cache/backend_test.go +++ b/internal/cache/backend_test.go @@ -17,7 +17,7 @@ import ( ) func loadAndCompare(t testing.TB, be restic.Backend, h restic.Handle, data []byte) { - buf, err := backend.LoadAll(context.TODO(), be, h) + buf, err := backend.LoadAll(context.TODO(), nil, be, h) if err != nil { t.Fatal(err) } @@ -147,7 +147,7 @@ func TestErrorBackend(t *testing.T) { loadTest := func(wg *sync.WaitGroup, be restic.Backend) { defer wg.Done() - buf, err := backend.LoadAll(context.TODO(), be, h) + buf, err := backend.LoadAll(context.TODO(), nil, be, h) if err == testErr { return } diff --git a/internal/repository/index.go b/internal/repository/index.go index 8d6d64c3e..ef6661dfc 100644 --- a/internal/repository/index.go +++ b/internal/repository/index.go @@ -552,7 +552,7 @@ func DecodeOldIndex(buf []byte) (idx *Index, err error) { func LoadIndexWithDecoder(ctx context.Context, repo restic.Repository, id restic.ID, fn func([]byte) (*Index, error)) (idx *Index, err error) { debug.Log("Loading index %v", id) - buf, err := repo.LoadAndDecrypt(ctx, restic.IndexFile, id) + buf, err := repo.LoadAndDecrypt(ctx, nil, restic.IndexFile, id) if err != nil { return nil, err } diff --git a/internal/repository/key.go b/internal/repository/key.go index 46e3b912f..62558c0b3 100644 --- a/internal/repository/key.go +++ b/internal/repository/key.go @@ -184,7 +184,7 @@ func SearchKey(ctx context.Context, s *Repository, password string, maxKeys int, // LoadKey loads a key from the backend. func LoadKey(ctx context.Context, s *Repository, name string) (k *Key, err error) { h := restic.Handle{Type: restic.KeyFile, Name: name} - data, err := backend.LoadAll(ctx, s.be, h) + data, err := backend.LoadAll(ctx, nil, s.be, h) if err != nil { return nil, err } diff --git a/internal/repository/repository.go b/internal/repository/repository.go index 880006cf5..6ab8ca595 100644 --- a/internal/repository/repository.go +++ b/internal/repository/repository.go @@ -9,7 +9,6 @@ import ( "io" "os" - "github.com/restic/restic/internal/backend" "github.com/restic/restic/internal/cache" "github.com/restic/restic/internal/crypto" "github.com/restic/restic/internal/debug" @@ -67,15 +66,29 @@ func (r *Repository) PrefixLength(t restic.FileType) (int, error) { return restic.PrefixLength(r.be, t) } -// LoadAndDecrypt loads and decrypts data identified by t and id from the -// backend. -func (r *Repository) LoadAndDecrypt(ctx context.Context, t restic.FileType, id restic.ID) (buf []byte, err error) { +// LoadAndDecrypt loads and decrypts the file with the given type and ID, using +// the supplied buffer (which must be empty). If the buffer is nil, a new +// buffer will be allocated and returned. +func (r *Repository) LoadAndDecrypt(ctx context.Context, buf []byte, t restic.FileType, id restic.ID) ([]byte, error) { + if len(buf) != 0 { + panic("buf is not empty") + } + debug.Log("load %v with id %v", t, id) h := restic.Handle{Type: t, Name: id.String()} - buf, err = backend.LoadAll(ctx, r.be, h) + err := r.be.Load(ctx, h, 0, 0, func(rd io.Reader) error { + // make sure this call is idempotent, in case an error occurs + wr := bytes.NewBuffer(buf[:0]) + _, cerr := io.Copy(wr, rd) + if cerr != nil { + return cerr + } + buf = wr.Bytes() + return nil + }) + if err != nil { - debug.Log("error loading %v: %v", h, err) return nil, err } @@ -188,7 +201,7 @@ func (r *Repository) loadBlob(ctx context.Context, id restic.ID, t restic.BlobTy // LoadJSONUnpacked decrypts the data and afterwards calls json.Unmarshal on // the item. func (r *Repository) LoadJSONUnpacked(ctx context.Context, t restic.FileType, id restic.ID, item interface{}) (err error) { - buf, err := r.LoadAndDecrypt(ctx, t, id) + buf, err := r.LoadAndDecrypt(ctx, nil, t, id) if err != nil { return err } diff --git a/internal/repository/repository_test.go b/internal/repository/repository_test.go index 8ea203d59..43d70c533 100644 --- a/internal/repository/repository_test.go +++ b/internal/repository/repository_test.go @@ -244,7 +244,7 @@ func BenchmarkLoadAndDecrypt(b *testing.B) { b.SetBytes(int64(length)) for i := 0; i < b.N; i++ { - data, err := repo.LoadAndDecrypt(context.TODO(), restic.DataFile, storageID) + data, err := repo.LoadAndDecrypt(context.TODO(), nil, restic.DataFile, storageID) rtest.OK(b, err) if len(data) != length { b.Errorf("wanted %d bytes, got %d", length, len(data)) diff --git a/internal/restic/repository.go b/internal/restic/repository.go index ff8f38034..46d7379db 100644 --- a/internal/restic/repository.go +++ b/internal/restic/repository.go @@ -39,8 +39,11 @@ type Repository interface { SaveUnpacked(context.Context, FileType, []byte) (ID, error) SaveJSONUnpacked(context.Context, FileType, interface{}) (ID, error) - LoadJSONUnpacked(context.Context, FileType, ID, interface{}) error - LoadAndDecrypt(context.Context, FileType, ID) ([]byte, error) + LoadJSONUnpacked(ctx context.Context, t FileType, id ID, dest interface{}) error + // LoadAndDecrypt loads and decrypts the file with the given type and ID, + // using the supplied buffer (which must be empty). If the buffer is nil, a + // new buffer will be allocated and returned. + LoadAndDecrypt(ctx context.Context, buf []byte, t FileType, id ID) (data []byte, err error) LoadBlob(context.Context, BlobType, ID, []byte) (int, error) SaveBlob(context.Context, BlobType, []byte, ID) (ID, error) From 66efa425bf8b2d88b7a3b7ef1d1805e1cea0c023 Mon Sep 17 00:00:00 2001 From: Alexander Neumann Date: Sun, 24 Mar 2019 22:12:38 +0100 Subject: [PATCH 5/6] Reuse buffer in worker functions --- internal/checker/checker.go | 7 +++++-- internal/repository/index.go | 12 ++++++------ internal/repository/repository.go | 15 +++++++++------ 3 files changed, 20 insertions(+), 14 deletions(-) diff --git a/internal/checker/checker.go b/internal/checker/checker.go index 79b3cb4b9..c5bee4bef 100644 --- a/internal/checker/checker.go +++ b/internal/checker/checker.go @@ -108,14 +108,17 @@ func (c *Checker) LoadIndex(ctx context.Context) (hints []error, errs []error) { // a worker receives an index ID from ch, loads the index, and sends it to indexCh worker := func() error { + var buf []byte for fi := range ch { debug.Log("worker got file %v", fi.ID.Str()) - idx, err := repository.LoadIndexWithDecoder(ctx, c.repo, fi.ID, repository.DecodeIndex) + var err error + var idx *repository.Index + idx, buf, err = repository.LoadIndexWithDecoder(ctx, c.repo, buf[:0], fi.ID, repository.DecodeIndex) if errors.Cause(err) == repository.ErrOldIndexFormat { debug.Log("index %v has old format", fi.ID.Str()) hints = append(hints, ErrOldIndexFormat{fi.ID}) - idx, err = repository.LoadIndexWithDecoder(ctx, c.repo, fi.ID, repository.DecodeOldIndex) + idx, buf, err = repository.LoadIndexWithDecoder(ctx, c.repo, buf[:0], fi.ID, repository.DecodeOldIndex) } err = errors.Wrapf(err, "error loading index %v", fi.ID.Str()) diff --git a/internal/repository/index.go b/internal/repository/index.go index ef6661dfc..32d00882a 100644 --- a/internal/repository/index.go +++ b/internal/repository/index.go @@ -549,21 +549,21 @@ func DecodeOldIndex(buf []byte) (idx *Index, err error) { } // LoadIndexWithDecoder loads the index and decodes it with fn. -func LoadIndexWithDecoder(ctx context.Context, repo restic.Repository, id restic.ID, fn func([]byte) (*Index, error)) (idx *Index, err error) { +func LoadIndexWithDecoder(ctx context.Context, repo restic.Repository, buf []byte, id restic.ID, fn func([]byte) (*Index, error)) (*Index, []byte, error) { debug.Log("Loading index %v", id) - buf, err := repo.LoadAndDecrypt(ctx, nil, restic.IndexFile, id) + buf, err := repo.LoadAndDecrypt(ctx, buf[:0], restic.IndexFile, id) if err != nil { - return nil, err + return nil, buf[:0], err } - idx, err = fn(buf) + idx, err := fn(buf) if err != nil { debug.Log("error while decoding index %v: %v", id, err) - return nil, err + return nil, buf[:0], err } idx.id = id - return idx, nil + return idx, buf, nil } diff --git a/internal/repository/repository.go b/internal/repository/repository.go index 6ab8ca595..cb78c25c1 100644 --- a/internal/repository/repository.go +++ b/internal/repository/repository.go @@ -431,11 +431,13 @@ func (r *Repository) LoadIndex(ctx context.Context) error { // a worker receives an index ID from ch, loads the index, and sends it to indexCh worker := func() error { + var buf []byte for fi := range ch { - idx, err := LoadIndex(ctx, r, fi.ID) - if err != nil { - fmt.Fprintf(os.Stderr, "%v, ignoring\n", err) - return nil + var err error + var idx *Index + idx, buf, err = LoadIndexWithDecoder(ctx, r, buf[:0], fi.ID, DecodeIndex) + if err != nil && errors.Cause(err) == ErrOldIndexFormat { + idx, buf, err = LoadIndexWithDecoder(ctx, r, buf[:0], fi.ID, DecodeOldIndex) } select { @@ -548,14 +550,15 @@ func (r *Repository) PrepareCache(indexIDs restic.IDSet) error { // LoadIndex loads the index id from backend and returns it. func LoadIndex(ctx context.Context, repo restic.Repository, id restic.ID) (*Index, error) { - idx, err := LoadIndexWithDecoder(ctx, repo, id, DecodeIndex) + idx, _, err := LoadIndexWithDecoder(ctx, repo, nil, id, DecodeIndex) if err == nil { return idx, nil } if errors.Cause(err) == ErrOldIndexFormat { fmt.Fprintf(os.Stderr, "index %v has old format\n", id.Str()) - return LoadIndexWithDecoder(ctx, repo, id, DecodeOldIndex) + idx, _, err := LoadIndexWithDecoder(ctx, repo, nil, id, DecodeOldIndex) + return idx, err } return nil, err From 73047388720bb30b9f766210af05bad6f769f331 Mon Sep 17 00:00:00 2001 From: Alexander Neumann Date: Mon, 25 Mar 2019 21:49:30 +0100 Subject: [PATCH 6/6] check: Reduce default parallelism from 40 to 5 --- internal/checker/checker.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/internal/checker/checker.go b/internal/checker/checker.go index c5bee4bef..fcb9db490 100644 --- a/internal/checker/checker.go +++ b/internal/checker/checker.go @@ -49,7 +49,7 @@ func New(repo restic.Repository) *Checker { return c } -const defaultParallelism = 40 +const defaultParallelism = 5 // ErrDuplicatePacks is returned when a pack is found in more than one index. type ErrDuplicatePacks struct {