diff --git a/internal/repository/repository.go b/internal/repository/repository.go index 872b2d71e..625ad9b16 100644 --- a/internal/repository/repository.go +++ b/internal/repository/repository.go @@ -831,6 +831,9 @@ func (r *Repository) SaveBlob(ctx context.Context, t restic.BlobType, buf []byte type BackendLoadFn func(ctx context.Context, h restic.Handle, length int, offset int64, fn func(rd io.Reader) error) error +// Skip sections with more than 4MB unused blobs +const maxUnusedRange = 4 * 1024 * 1024 + // StreamPack loads the listed blobs from the specified pack file. The plaintext blob is passed to // the handleBlobFn callback or an error if decryption failed or the blob hash does not match. In // case of download errors handleBlobFn might be called multiple times for the same blob. If the @@ -844,6 +847,29 @@ func StreamPack(ctx context.Context, beLoad BackendLoadFn, key *crypto.Key, pack sort.Slice(blobs, func(i, j int) bool { return blobs[i].Offset < blobs[j].Offset }) + + lowerIdx := 0 + lastPos := blobs[0].Offset + for i := 0; i < len(blobs); i++ { + if blobs[i].Offset < lastPos { + // don't wait for streamPackPart to fail + return errors.Errorf("overlapping blobs in pack %v", packID) + } + if blobs[i].Offset-lastPos > maxUnusedRange { + // load everything up to the skipped file section + err := streamPackPart(ctx, beLoad, key, packID, blobs[lowerIdx:i], handleBlobFn) + if err != nil { + return err + } + lowerIdx = i + } + lastPos = blobs[i].Offset + blobs[i].Length + } + // load remainder + return streamPackPart(ctx, beLoad, key, packID, blobs[lowerIdx:], handleBlobFn) +} + +func streamPackPart(ctx context.Context, beLoad BackendLoadFn, key *crypto.Key, packID restic.ID, blobs []restic.Blob, handleBlobFn func(blob restic.BlobHandle, buf []byte, err error) error) error { h := restic.Handle{Type: restic.PackFile, Name: packID.String(), ContainedBlobType: restic.DataBlob} dataStart := blobs[0].Offset diff --git a/internal/repository/repository_test.go b/internal/repository/repository_test.go index c0f96f7ad..b5b0ff92d 100644 --- a/internal/repository/repository_test.go +++ b/internal/repository/repository_test.go @@ -455,17 +455,19 @@ func testStreamPack(t *testing.T, version uint) { } blobSizes := []int{ + 5522811, 10, 5231, 18812, 123123, + 13522811, 12301, 892242, 28616, 13351, 252287, 188883, - 2522811, + 3522811, 18883, } @@ -481,6 +483,7 @@ func testStreamPack(t *testing.T, version uint) { packfileBlobs, packfile := buildPackfileWithoutHeader(t, blobSizes, &key, compress) + loadCalls := 0 load := func(ctx context.Context, h restic.Handle, length int, offset int64, fn func(rd io.Reader) error) error { data := packfile @@ -495,6 +498,7 @@ func testStreamPack(t *testing.T, version uint) { } data = data[:length] + loadCalls++ return fn(bytes.NewReader(data)) @@ -504,19 +508,20 @@ func testStreamPack(t *testing.T, version uint) { t.Run("regular", func(t *testing.T) { tests := []struct { blobs []restic.Blob + calls int }{ - {packfileBlobs[1:2]}, - {packfileBlobs[2:5]}, - {packfileBlobs[2:8]}, + {packfileBlobs[1:2], 1}, + {packfileBlobs[2:5], 1}, + {packfileBlobs[2:8], 1}, {[]restic.Blob{ packfileBlobs[0], - packfileBlobs[8], packfileBlobs[4], - }}, + packfileBlobs[2], + }, 1}, {[]restic.Blob{ packfileBlobs[0], packfileBlobs[len(packfileBlobs)-1], - }}, + }, 2}, } for _, test := range tests { @@ -542,6 +547,7 @@ func testStreamPack(t *testing.T, version uint) { wantBlobs[blob.ID] = 1 } + loadCalls = 0 err = repository.StreamPack(ctx, load, &key, restic.ID{}, test.blobs, handleBlob) if err != nil { t.Fatal(err) @@ -550,6 +556,7 @@ func testStreamPack(t *testing.T, version uint) { if !cmp.Equal(wantBlobs, gotBlobs) { t.Fatal(cmp.Diff(wantBlobs, gotBlobs)) } + rtest.Equals(t, test.calls, loadCalls) }) } })