repository: StreamPack in parts if there are too large gaps

For large pack sizes we might be only interested in the first and last
blob of a pack file. Thus stream a pack file in multiple parts if the
gaps between requested blobs grow too large.
This commit is contained in:
Michael Eischer 2022-07-23 22:40:15 +02:00
parent 55a11c1396
commit 7266f07c87
2 changed files with 40 additions and 7 deletions

View File

@ -831,6 +831,9 @@ func (r *Repository) SaveBlob(ctx context.Context, t restic.BlobType, buf []byte
type BackendLoadFn func(ctx context.Context, h restic.Handle, length int, offset int64, fn func(rd io.Reader) error) error type BackendLoadFn func(ctx context.Context, h restic.Handle, length int, offset int64, fn func(rd io.Reader) error) error
// Skip sections with more than 4MB unused blobs
const maxUnusedRange = 4 * 1024 * 1024
// StreamPack loads the listed blobs from the specified pack file. The plaintext blob is passed to // StreamPack loads the listed blobs from the specified pack file. The plaintext blob is passed to
// the handleBlobFn callback or an error if decryption failed or the blob hash does not match. In // the handleBlobFn callback or an error if decryption failed or the blob hash does not match. In
// case of download errors handleBlobFn might be called multiple times for the same blob. If the // case of download errors handleBlobFn might be called multiple times for the same blob. If the
@ -844,6 +847,29 @@ func StreamPack(ctx context.Context, beLoad BackendLoadFn, key *crypto.Key, pack
sort.Slice(blobs, func(i, j int) bool { sort.Slice(blobs, func(i, j int) bool {
return blobs[i].Offset < blobs[j].Offset return blobs[i].Offset < blobs[j].Offset
}) })
lowerIdx := 0
lastPos := blobs[0].Offset
for i := 0; i < len(blobs); i++ {
if blobs[i].Offset < lastPos {
// don't wait for streamPackPart to fail
return errors.Errorf("overlapping blobs in pack %v", packID)
}
if blobs[i].Offset-lastPos > maxUnusedRange {
// load everything up to the skipped file section
err := streamPackPart(ctx, beLoad, key, packID, blobs[lowerIdx:i], handleBlobFn)
if err != nil {
return err
}
lowerIdx = i
}
lastPos = blobs[i].Offset + blobs[i].Length
}
// load remainder
return streamPackPart(ctx, beLoad, key, packID, blobs[lowerIdx:], handleBlobFn)
}
func streamPackPart(ctx context.Context, beLoad BackendLoadFn, key *crypto.Key, packID restic.ID, blobs []restic.Blob, handleBlobFn func(blob restic.BlobHandle, buf []byte, err error) error) error {
h := restic.Handle{Type: restic.PackFile, Name: packID.String(), ContainedBlobType: restic.DataBlob} h := restic.Handle{Type: restic.PackFile, Name: packID.String(), ContainedBlobType: restic.DataBlob}
dataStart := blobs[0].Offset dataStart := blobs[0].Offset

View File

@ -455,17 +455,19 @@ func testStreamPack(t *testing.T, version uint) {
} }
blobSizes := []int{ blobSizes := []int{
5522811,
10, 10,
5231, 5231,
18812, 18812,
123123, 123123,
13522811,
12301, 12301,
892242, 892242,
28616, 28616,
13351, 13351,
252287, 252287,
188883, 188883,
2522811, 3522811,
18883, 18883,
} }
@ -481,6 +483,7 @@ func testStreamPack(t *testing.T, version uint) {
packfileBlobs, packfile := buildPackfileWithoutHeader(t, blobSizes, &key, compress) packfileBlobs, packfile := buildPackfileWithoutHeader(t, blobSizes, &key, compress)
loadCalls := 0
load := func(ctx context.Context, h restic.Handle, length int, offset int64, fn func(rd io.Reader) error) error { load := func(ctx context.Context, h restic.Handle, length int, offset int64, fn func(rd io.Reader) error) error {
data := packfile data := packfile
@ -495,6 +498,7 @@ func testStreamPack(t *testing.T, version uint) {
} }
data = data[:length] data = data[:length]
loadCalls++
return fn(bytes.NewReader(data)) return fn(bytes.NewReader(data))
@ -504,19 +508,20 @@ func testStreamPack(t *testing.T, version uint) {
t.Run("regular", func(t *testing.T) { t.Run("regular", func(t *testing.T) {
tests := []struct { tests := []struct {
blobs []restic.Blob blobs []restic.Blob
calls int
}{ }{
{packfileBlobs[1:2]}, {packfileBlobs[1:2], 1},
{packfileBlobs[2:5]}, {packfileBlobs[2:5], 1},
{packfileBlobs[2:8]}, {packfileBlobs[2:8], 1},
{[]restic.Blob{ {[]restic.Blob{
packfileBlobs[0], packfileBlobs[0],
packfileBlobs[8],
packfileBlobs[4], packfileBlobs[4],
}}, packfileBlobs[2],
}, 1},
{[]restic.Blob{ {[]restic.Blob{
packfileBlobs[0], packfileBlobs[0],
packfileBlobs[len(packfileBlobs)-1], packfileBlobs[len(packfileBlobs)-1],
}}, }, 2},
} }
for _, test := range tests { for _, test := range tests {
@ -542,6 +547,7 @@ func testStreamPack(t *testing.T, version uint) {
wantBlobs[blob.ID] = 1 wantBlobs[blob.ID] = 1
} }
loadCalls = 0
err = repository.StreamPack(ctx, load, &key, restic.ID{}, test.blobs, handleBlob) err = repository.StreamPack(ctx, load, &key, restic.ID{}, test.blobs, handleBlob)
if err != nil { if err != nil {
t.Fatal(err) t.Fatal(err)
@ -550,6 +556,7 @@ func testStreamPack(t *testing.T, version uint) {
if !cmp.Equal(wantBlobs, gotBlobs) { if !cmp.Equal(wantBlobs, gotBlobs) {
t.Fatal(cmp.Diff(wantBlobs, gotBlobs)) t.Fatal(cmp.Diff(wantBlobs, gotBlobs))
} }
rtest.Equals(t, test.calls, loadCalls)
}) })
} }
}) })