From 9328f34d4363832344f450cbaa8e01d1cb135114 Mon Sep 17 00:00:00 2001 From: Michael Eischer Date: Sun, 7 Jan 2024 12:00:32 +0100 Subject: [PATCH 1/5] restore: split downloadPack into smaller methods --- internal/restorer/filerestorer.go | 122 ++++++++++++++++-------------- 1 file changed, 65 insertions(+), 57 deletions(-) diff --git a/internal/restorer/filerestorer.go b/internal/restorer/filerestorer.go index 1fc74c7f0..7621e5ebb 100644 --- a/internal/restorer/filerestorer.go +++ b/internal/restorer/filerestorer.go @@ -197,12 +197,13 @@ func (r *fileRestorer) restoreFiles(ctx context.Context) error { return wg.Wait() } -func (r *fileRestorer) downloadPack(ctx context.Context, pack *packInfo) error { +type blobToFileOffsetsMapping map[restic.ID]struct { + files map[*fileInfo][]int64 // file -> offsets (plural!) of the blob in the file +} +func (r *fileRestorer) downloadPack(ctx context.Context, pack *packInfo) error { // calculate blob->[]files->[]offsets mappings - blobs := make(map[restic.ID]struct { - files map[*fileInfo][]int64 // file -> offsets (plural!) of the blob in the file - }) + blobs := make(blobToFileOffsetsMapping) var blobList []restic.Blob for file := range pack.files { addBlob := func(blob restic.Blob, fileOffset int64) { @@ -239,60 +240,9 @@ func (r *fileRestorer) downloadPack(ctx context.Context, pack *packInfo) error { } } - sanitizeError := func(file *fileInfo, err error) error { - if err != nil { - err = r.Error(file.location, err) - } - return err - } - // track already processed blobs for precise error reporting processedBlobs := restic.NewBlobSet() - err := repository.StreamPack(ctx, r.packLoader, r.key, pack.id, blobList, func(h restic.BlobHandle, blobData []byte, err error) error { - processedBlobs.Insert(h) - blob := blobs[h.ID] - if err != nil { - for file := range blob.files { - if errFile := sanitizeError(file, err); errFile != nil { - return errFile - } - } - return nil - } - for file, offsets := range blob.files { - for _, offset := range offsets { - writeToFile := func() error { - // this looks overly complicated and needs explanation - // two competing requirements: - // - must create the file once and only once - // - should allow concurrent writes to the file - // so write the first blob while holding file lock - // write other blobs after releasing the lock - createSize := int64(-1) - file.lock.Lock() - if file.inProgress { - file.lock.Unlock() - } else { - defer file.lock.Unlock() - file.inProgress = true - createSize = file.size - } - writeErr := r.filesWriter.writeToFile(r.targetPath(file.location), blobData, offset, createSize, file.sparse) - - if r.progress != nil { - r.progress.AddProgress(file.location, uint64(len(blobData)), uint64(file.size)) - } - - return writeErr - } - err := sanitizeError(file, writeToFile()) - if err != nil { - return err - } - } - } - return nil - }) + err := r.downloadBlobs(ctx, pack.id, blobList, blobs, processedBlobs) if err != nil { // only report error for not yet processed blobs @@ -308,7 +258,7 @@ func (r *fileRestorer) downloadPack(ctx context.Context, pack *packInfo) error { } for file := range affectedFiles { - if errFile := sanitizeError(file, err); errFile != nil { + if errFile := r.sanitizeError(file, err); errFile != nil { return errFile } } @@ -316,3 +266,61 @@ func (r *fileRestorer) downloadPack(ctx context.Context, pack *packInfo) error { return nil } + +func (r *fileRestorer) sanitizeError(file *fileInfo, err error) error { + if err != nil { + err = r.Error(file.location, err) + } + return err +} + +func (r *fileRestorer) downloadBlobs(ctx context.Context, packID restic.ID, blobList []restic.Blob, + blobs blobToFileOffsetsMapping, processedBlobs restic.BlobSet) error { + + return repository.StreamPack(ctx, r.packLoader, r.key, packID, blobList, + func(h restic.BlobHandle, blobData []byte, err error) error { + processedBlobs.Insert(h) + blob := blobs[h.ID] + if err != nil { + for file := range blob.files { + if errFile := r.sanitizeError(file, err); errFile != nil { + return errFile + } + } + return nil + } + for file, offsets := range blob.files { + for _, offset := range offsets { + writeToFile := func() error { + // this looks overly complicated and needs explanation + // two competing requirements: + // - must create the file once and only once + // - should allow concurrent writes to the file + // so write the first blob while holding file lock + // write other blobs after releasing the lock + createSize := int64(-1) + file.lock.Lock() + if file.inProgress { + file.lock.Unlock() + } else { + defer file.lock.Unlock() + file.inProgress = true + createSize = file.size + } + writeErr := r.filesWriter.writeToFile(r.targetPath(file.location), blobData, offset, createSize, file.sparse) + + if r.progress != nil { + r.progress.AddProgress(file.location, uint64(len(blobData)), uint64(file.size)) + } + + return writeErr + } + err := r.sanitizeError(file, writeToFile()) + if err != nil { + return err + } + } + } + return nil + }) +} From 00d18b7a8847796d709d7de6e7bce3daeaf493b5 Mon Sep 17 00:00:00 2001 From: Michael Eischer Date: Sun, 7 Jan 2024 12:06:36 +0100 Subject: [PATCH 2/5] restore: cleanup downloadPack --- internal/restorer/filerestorer.go | 19 +++++++++++-------- 1 file changed, 11 insertions(+), 8 deletions(-) diff --git a/internal/restorer/filerestorer.go b/internal/restorer/filerestorer.go index 7621e5ebb..403651763 100644 --- a/internal/restorer/filerestorer.go +++ b/internal/restorer/filerestorer.go @@ -199,18 +199,18 @@ func (r *fileRestorer) restoreFiles(ctx context.Context) error { type blobToFileOffsetsMapping map[restic.ID]struct { files map[*fileInfo][]int64 // file -> offsets (plural!) of the blob in the file + blob restic.Blob } func (r *fileRestorer) downloadPack(ctx context.Context, pack *packInfo) error { // calculate blob->[]files->[]offsets mappings blobs := make(blobToFileOffsetsMapping) - var blobList []restic.Blob for file := range pack.files { addBlob := func(blob restic.Blob, fileOffset int64) { blobInfo, ok := blobs[blob.ID] if !ok { blobInfo.files = make(map[*fileInfo][]int64) - blobList = append(blobList, blob) + blobInfo.blob = blob blobs[blob.ID] = blobInfo } blobInfo.files[file] = append(blobInfo.files[file], fileOffset) @@ -242,17 +242,16 @@ func (r *fileRestorer) downloadPack(ctx context.Context, pack *packInfo) error { // track already processed blobs for precise error reporting processedBlobs := restic.NewBlobSet() - err := r.downloadBlobs(ctx, pack.id, blobList, blobs, processedBlobs) + err := r.downloadBlobs(ctx, pack.id, blobs, processedBlobs) if err != nil { // only report error for not yet processed blobs affectedFiles := make(map[*fileInfo]struct{}) - for _, blob := range blobList { - if processedBlobs.Has(blob.BlobHandle) { + for _, entry := range blobs { + if processedBlobs.Has(entry.blob.BlobHandle) { continue } - blob := blobs[blob.ID] - for file := range blob.files { + for file := range entry.files { affectedFiles[file] = struct{}{} } } @@ -274,9 +273,13 @@ func (r *fileRestorer) sanitizeError(file *fileInfo, err error) error { return err } -func (r *fileRestorer) downloadBlobs(ctx context.Context, packID restic.ID, blobList []restic.Blob, +func (r *fileRestorer) downloadBlobs(ctx context.Context, packID restic.ID, blobs blobToFileOffsetsMapping, processedBlobs restic.BlobSet) error { + blobList := make([]restic.Blob, 0, len(blobs)) + for _, entry := range blobs { + blobList = append(blobList, entry.blob) + } return repository.StreamPack(ctx, r.packLoader, r.key, packID, blobList, func(h restic.BlobHandle, blobData []byte, err error) error { processedBlobs.Insert(h) From 226791041852a522e71d1145f3361658609b05fe Mon Sep 17 00:00:00 2001 From: Michael Eischer Date: Sun, 7 Jan 2024 12:20:31 +0100 Subject: [PATCH 3/5] restore: split error reporting from downloadPack --- internal/restorer/filerestorer.go | 45 +++++++++++++++++-------------- 1 file changed, 25 insertions(+), 20 deletions(-) diff --git a/internal/restorer/filerestorer.go b/internal/restorer/filerestorer.go index 403651763..f2e2cf24a 100644 --- a/internal/restorer/filerestorer.go +++ b/internal/restorer/filerestorer.go @@ -244,26 +244,7 @@ func (r *fileRestorer) downloadPack(ctx context.Context, pack *packInfo) error { processedBlobs := restic.NewBlobSet() err := r.downloadBlobs(ctx, pack.id, blobs, processedBlobs) - if err != nil { - // only report error for not yet processed blobs - affectedFiles := make(map[*fileInfo]struct{}) - for _, entry := range blobs { - if processedBlobs.Has(entry.blob.BlobHandle) { - continue - } - for file := range entry.files { - affectedFiles[file] = struct{}{} - } - } - - for file := range affectedFiles { - if errFile := r.sanitizeError(file, err); errFile != nil { - return errFile - } - } - } - - return nil + return r.reportError(blobs, processedBlobs, err) } func (r *fileRestorer) sanitizeError(file *fileInfo, err error) error { @@ -273,6 +254,30 @@ func (r *fileRestorer) sanitizeError(file *fileInfo, err error) error { return err } +func (r *fileRestorer) reportError(blobs blobToFileOffsetsMapping, processedBlobs restic.BlobSet, err error) error { + if err == nil { + return nil + } + + // only report error for not yet processed blobs + affectedFiles := make(map[*fileInfo]struct{}) + for _, entry := range blobs { + if processedBlobs.Has(entry.blob.BlobHandle) { + continue + } + for file := range entry.files { + affectedFiles[file] = struct{}{} + } + } + + for file := range affectedFiles { + if errFile := r.sanitizeError(file, err); errFile != nil { + return errFile + } + } + return nil +} + func (r *fileRestorer) downloadBlobs(ctx context.Context, packID restic.ID, blobs blobToFileOffsetsMapping, processedBlobs restic.BlobSet) error { From e78be75d1efdf51051d1721e5bb48d041bc03ddb Mon Sep 17 00:00:00 2001 From: Michael Eischer Date: Sun, 7 Jan 2024 12:17:35 +0100 Subject: [PATCH 4/5] restore: separately restore blobs that are frequently referenced Writing these blobs to their files can take a long time and consequently cause the backend connection to time out. Avoid that by retrieving these blobs separately. --- internal/restorer/filerestorer.go | 27 +++++++++++++++++++++++++- internal/restorer/filerestorer_test.go | 21 ++++++++++++++++++++ 2 files changed, 47 insertions(+), 1 deletion(-) diff --git a/internal/restorer/filerestorer.go b/internal/restorer/filerestorer.go index f2e2cf24a..99a460321 100644 --- a/internal/restorer/filerestorer.go +++ b/internal/restorer/filerestorer.go @@ -242,8 +242,33 @@ func (r *fileRestorer) downloadPack(ctx context.Context, pack *packInfo) error { // track already processed blobs for precise error reporting processedBlobs := restic.NewBlobSet() - err := r.downloadBlobs(ctx, pack.id, blobs, processedBlobs) + for _, entry := range blobs { + occurrences := 0 + for _, offsets := range entry.files { + occurrences += len(offsets) + } + // With a maximum blob size of 8MB, the normal blob streaming has to write + // at most 800MB for a single blob. This should be short enough to avoid + // network connection timeouts. Based on a quick test, a limit of 100 only + // selects a very small number of blobs (the number of references per blob + // - aka. `count` - seem to follow a expontential distribution) + if occurrences > 100 { + // process frequently referenced blobs first as these can take a long time to write + // which can cause backend connections to time out + delete(blobs, entry.blob.ID) + partialBlobs := blobToFileOffsetsMapping{entry.blob.ID: entry} + err := r.downloadBlobs(ctx, pack.id, partialBlobs, processedBlobs) + if err := r.reportError(blobs, processedBlobs, err); err != nil { + return err + } + } + } + if len(blobs) == 0 { + return nil + } + + err := r.downloadBlobs(ctx, pack.id, blobs, processedBlobs) return r.reportError(blobs, processedBlobs, err) } diff --git a/internal/restorer/filerestorer_test.go b/internal/restorer/filerestorer_test.go index 94b068159..c5bc3fe31 100644 --- a/internal/restorer/filerestorer_test.go +++ b/internal/restorer/filerestorer_test.go @@ -248,6 +248,27 @@ func TestFileRestorerPackSkip(t *testing.T) { } } +func TestFileRestorerFrequentBlob(t *testing.T) { + tempdir := rtest.TempDir(t) + + for _, sparse := range []bool{false, true} { + blobs := []TestBlob{ + {"data1-1", "pack1-1"}, + } + for i := 0; i < 10000; i++ { + blobs = append(blobs, TestBlob{"a", "pack1-1"}) + } + blobs = append(blobs, TestBlob{"end", "pack1-1"}) + + restoreAndVerify(t, tempdir, []TestFile{ + { + name: "file1", + blobs: blobs, + }, + }, nil, sparse) + } +} + func TestErrorRestoreFiles(t *testing.T) { tempdir := rtest.TempDir(t) content := []TestFile{ From 4ea3796455a3bb43dba03c268fbe3d9e787d133c Mon Sep 17 00:00:00 2001 From: Michael Eischer Date: Sun, 7 Jan 2024 14:18:20 +0100 Subject: [PATCH 5/5] add changelog for reliable restores --- changelog/unreleased/pull-4626 | 11 +++++++++++ 1 file changed, 11 insertions(+) create mode 100644 changelog/unreleased/pull-4626 diff --git a/changelog/unreleased/pull-4626 b/changelog/unreleased/pull-4626 new file mode 100644 index 000000000..ea16d749f --- /dev/null +++ b/changelog/unreleased/pull-4626 @@ -0,0 +1,11 @@ +Bugfix: Improve reliability of restoring large files + +In some cases restic failed to restore large files that frequently contain the +same file chunk. In combination with certain backends, this could result in +network connection timeouts that caused incomplete restores. + +Restic now includes special handling for such file chunks to ensure reliable +restores. + +https://github.com/restic/restic/pull/4626 +https://forum.restic.net/t/errors-restoring-with-restic-on-windows-server-s3/6943