From 621012dac08f88b844349d5a976a3f336a883be1 Mon Sep 17 00:00:00 2001 From: Michael Eischer Date: Sun, 31 Dec 2023 15:27:36 +0100 Subject: [PATCH 1/5] repository: Add blob loading fallback to LoadBlobsFromPack Try to retrieve individual blobs via LoadBlob if streaming did not work. --- internal/repository/repack.go | 9 +- internal/repository/repository.go | 35 ++++++-- .../repository/repository_internal_test.go | 86 +++++++++++++++++-- 3 files changed, 109 insertions(+), 21 deletions(-) diff --git a/internal/repository/repack.go b/internal/repository/repack.go index 5588984f6..e839a9c0f 100644 --- a/internal/repository/repack.go +++ b/internal/repository/repack.go @@ -79,13 +79,8 @@ func repack(ctx context.Context, repo restic.Repository, dstRepo restic.Reposito for t := range downloadQueue { err := repo.LoadBlobsFromPack(wgCtx, t.PackID, t.Blobs, func(blob restic.BlobHandle, buf []byte, err error) error { if err != nil { - var ierr error - // check whether we can get a valid copy somewhere else - buf, ierr = repo.LoadBlob(wgCtx, blob.Type, blob.ID, nil) - if ierr != nil { - // no luck, return the original error - return err - } + // a required blob couldn't be retrieved + return err } keepMutex.Lock() diff --git a/internal/repository/repository.go b/internal/repository/repository.go index 8e34c7125..f2cde014a 100644 --- a/internal/repository/repository.go +++ b/internal/repository/repository.go @@ -943,6 +943,7 @@ func (r *Repository) SaveBlob(ctx context.Context, t restic.BlobType, buf []byte } type backendLoadFn func(ctx context.Context, h backend.Handle, length int, offset int64, fn func(rd io.Reader) error) error +type loadBlobFn func(ctx context.Context, t restic.BlobType, id restic.ID, buf []byte) ([]byte, error) // Skip sections with more than 4MB unused blobs const maxUnusedRange = 4 * 1024 * 1024 @@ -952,10 +953,10 @@ const maxUnusedRange = 4 * 1024 * 1024 // handleBlobFn is called at most once for each blob. If the callback returns an error, // then LoadBlobsFromPack will abort and not retry it. func (r *Repository) LoadBlobsFromPack(ctx context.Context, packID restic.ID, blobs []restic.Blob, handleBlobFn func(blob restic.BlobHandle, buf []byte, err error) error) error { - return streamPack(ctx, r.Backend().Load, r.key, packID, blobs, handleBlobFn) + return streamPack(ctx, r.Backend().Load, r.LoadBlob, r.key, packID, blobs, handleBlobFn) } -func streamPack(ctx context.Context, beLoad backendLoadFn, key *crypto.Key, packID restic.ID, blobs []restic.Blob, handleBlobFn func(blob restic.BlobHandle, buf []byte, err error) error) error { +func streamPack(ctx context.Context, beLoad backendLoadFn, loadBlobFn loadBlobFn, key *crypto.Key, packID restic.ID, blobs []restic.Blob, handleBlobFn func(blob restic.BlobHandle, buf []byte, err error) error) error { if len(blobs) == 0 { // nothing to do return nil @@ -974,7 +975,7 @@ func streamPack(ctx context.Context, beLoad backendLoadFn, key *crypto.Key, pack } if blobs[i].Offset-lastPos > maxUnusedRange { // load everything up to the skipped file section - err := streamPackPart(ctx, beLoad, key, packID, blobs[lowerIdx:i], handleBlobFn) + err := streamPackPart(ctx, beLoad, loadBlobFn, key, packID, blobs[lowerIdx:i], handleBlobFn) if err != nil { return err } @@ -983,10 +984,10 @@ func streamPack(ctx context.Context, beLoad backendLoadFn, key *crypto.Key, pack lastPos = blobs[i].Offset + blobs[i].Length } // load remainder - return streamPackPart(ctx, beLoad, key, packID, blobs[lowerIdx:], handleBlobFn) + return streamPackPart(ctx, beLoad, loadBlobFn, key, packID, blobs[lowerIdx:], handleBlobFn) } -func streamPackPart(ctx context.Context, beLoad backendLoadFn, key *crypto.Key, packID restic.ID, blobs []restic.Blob, handleBlobFn func(blob restic.BlobHandle, buf []byte, err error) error) error { +func streamPackPart(ctx context.Context, beLoad backendLoadFn, loadBlobFn loadBlobFn, key *crypto.Key, packID restic.ID, blobs []restic.Blob, handleBlobFn func(blob restic.BlobHandle, buf []byte, err error) error) error { h := backend.Handle{Type: restic.PackFile, Name: packID.String(), IsMetadata: false} dataStart := blobs[0].Offset @@ -1022,6 +1023,17 @@ func streamPackPart(ctx context.Context, beLoad backendLoadFn, key *crypto.Key, return err } + if val.Err != nil && loadBlobFn != nil { + var ierr error + // check whether we can get a valid copy somewhere else + buf, ierr := loadBlobFn(ctx, val.Handle.Type, val.Handle.ID, nil) + if ierr == nil { + // success + val.Plaintext = buf + val.Err = nil + } + } + err = handleBlobFn(val.Handle, val.Plaintext, val.Err) if err != nil { cancel() @@ -1032,6 +1044,19 @@ func streamPackPart(ctx context.Context, beLoad backendLoadFn, key *crypto.Key, } return nil }) + + // the context is only still valid if handleBlobFn never returned an error + if ctx.Err() == nil && loadBlobFn != nil { + // check whether we can get the remaining blobs somewhere else + for _, entry := range blobs { + buf, ierr := loadBlobFn(ctx, entry.Type, entry.ID, nil) + err = handleBlobFn(entry.BlobHandle, buf, ierr) + if err != nil { + break + } + } + } + return errors.Wrap(err, "StreamPack") } diff --git a/internal/repository/repository_internal_test.go b/internal/repository/repository_internal_test.go index 0c7115bc9..1f71b17de 100644 --- a/internal/repository/repository_internal_test.go +++ b/internal/repository/repository_internal_test.go @@ -147,13 +147,7 @@ func TestStreamPack(t *testing.T) { func testStreamPack(t *testing.T, version uint) { // always use the same key for deterministic output - const jsonKey = `{"mac":{"k":"eQenuI8adktfzZMuC8rwdA==","r":"k8cfAly2qQSky48CQK7SBA=="},"encrypt":"MKO9gZnRiQFl8mDUurSDa9NMjiu9MUifUrODTHS05wo="}` - - var key crypto.Key - err := json.Unmarshal([]byte(jsonKey), &key) - if err != nil { - t.Fatal(err) - } + key := testKey(t) blobSizes := []int{ 5522811, @@ -276,7 +270,7 @@ func testStreamPack(t *testing.T, version uint) { loadCalls = 0 shortFirstLoad = test.shortFirstLoad - err = streamPack(ctx, load, &key, restic.ID{}, test.blobs, handleBlob) + err := streamPack(ctx, load, nil, &key, restic.ID{}, test.blobs, handleBlob) if err != nil { t.Fatal(err) } @@ -339,7 +333,7 @@ func testStreamPack(t *testing.T, version uint) { return err } - err = streamPack(ctx, load, &key, restic.ID{}, test.blobs, handleBlob) + err := streamPack(ctx, load, nil, &key, restic.ID{}, test.blobs, handleBlob) if err == nil { t.Fatalf("wanted error %v, got nil", test.err) } @@ -449,3 +443,77 @@ func TestUnpackedVerification(t *testing.T) { } } } + +func testKey(t *testing.T) crypto.Key { + const jsonKey = `{"mac":{"k":"eQenuI8adktfzZMuC8rwdA==","r":"k8cfAly2qQSky48CQK7SBA=="},"encrypt":"MKO9gZnRiQFl8mDUurSDa9NMjiu9MUifUrODTHS05wo="}` + + var key crypto.Key + err := json.Unmarshal([]byte(jsonKey), &key) + if err != nil { + t.Fatal(err) + } + return key +} + +func TestStreamPackFallback(t *testing.T) { + test := func(t *testing.T, failLoad bool) { + key := testKey(t) + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + + plaintext := rtest.Random(800, 42) + blobID := restic.Hash(plaintext) + blobs := []restic.Blob{ + { + Length: uint(crypto.CiphertextLength(len(plaintext))), + Offset: 0, + BlobHandle: restic.BlobHandle{ + ID: blobID, + Type: restic.DataBlob, + }, + }, + } + + var loadPack backendLoadFn + if failLoad { + loadPack = func(ctx context.Context, h backend.Handle, length int, offset int64, fn func(rd io.Reader) error) error { + return errors.New("load error") + } + } else { + loadPack = func(ctx context.Context, h backend.Handle, length int, offset int64, fn func(rd io.Reader) error) error { + // just return an empty array to provoke an error + data := make([]byte, length) + return fn(bytes.NewReader(data)) + } + } + + loadBlob := func(ctx context.Context, t restic.BlobType, id restic.ID, buf []byte) ([]byte, error) { + if id == blobID { + return plaintext, nil + } + return nil, errors.New("unknown blob") + } + + blobOK := false + handleBlob := func(blob restic.BlobHandle, buf []byte, err error) error { + rtest.OK(t, err) + rtest.Equals(t, blobID, blob.ID) + rtest.Equals(t, plaintext, buf) + blobOK = true + return err + } + + err := streamPack(ctx, loadPack, loadBlob, &key, restic.ID{}, blobs, handleBlob) + rtest.OK(t, err) + rtest.Assert(t, blobOK, "blob failed to load") + } + + t.Run("corrupted blob", func(t *testing.T) { + test(t, false) + }) + + // test fallback for failed pack loading + t.Run("failed load", func(t *testing.T) { + test(t, true) + }) +} From 666a0b0bdbc66129b5832de34cbaa6b1d0c3b2bb Mon Sep 17 00:00:00 2001 From: Michael Eischer Date: Mon, 22 Apr 2024 20:53:31 +0200 Subject: [PATCH 2/5] repository: streamPack: replace streaming with chunked download Due to the interface of streamPack, we cannot guarantee that operations progress fast enough that the underlying connections remains open. This introduces partial failures which massively complicate the error handling. Switch to a simpler approach that retrieves the pack in chunks of 32MB. If a blob is larger than this limit, then it is downloaded separately. To avoid multiple copies in memory, an auxiliary interface `discardReader` is introduced that allows directly accessing the downloaded byte slices, while still supporting the streaming used by the `check` command. --- internal/checker/checker.go | 36 ++++++- internal/repository/repository.go | 168 ++++++++++++++++++------------ 2 files changed, 134 insertions(+), 70 deletions(-) diff --git a/internal/checker/checker.go b/internal/checker/checker.go index 28f55ce3a..d2fc42ca6 100644 --- a/internal/checker/checker.go +++ b/internal/checker/checker.go @@ -561,7 +561,7 @@ func checkPack(ctx context.Context, r restic.Repository, id restic.ID, blobs []r hrd := hashing.NewReader(rd, sha256.New()) bufRd.Reset(hrd) - it := repository.NewPackBlobIterator(id, bufRd, 0, blobs, r.Key(), dec) + it := repository.NewPackBlobIterator(id, newBufReader(bufRd), 0, blobs, r.Key(), dec) for { val, err := it.Next() if err == repository.ErrPackEOF { @@ -647,11 +647,41 @@ func checkPack(ctx context.Context, r restic.Repository, id restic.ID, blobs []r return nil } +type bufReader struct { + rd *bufio.Reader + buf []byte +} + +func newBufReader(rd *bufio.Reader) *bufReader { + return &bufReader{ + rd: rd, + } +} + +func (b *bufReader) Discard(n int) (discarded int, err error) { + return b.rd.Discard(n) +} + +func (b *bufReader) ReadFull(n int) (buf []byte, err error) { + if cap(b.buf) < n { + b.buf = make([]byte, n) + } + b.buf = b.buf[:n] + + _, err = io.ReadFull(b.rd, b.buf) + if err != nil { + return nil, err + } + return b.buf, nil +} + // ReadData loads all data from the repository and checks the integrity. func (c *Checker) ReadData(ctx context.Context, errChan chan<- error) { c.ReadPacks(ctx, c.packs, nil, errChan) } +const maxStreamBufferSize = 4 * 1024 * 1024 + // ReadPacks loads data from specified packs and checks the integrity. func (c *Checker) ReadPacks(ctx context.Context, packs map[restic.ID]int64, p *progress.Counter, errChan chan<- error) { defer close(errChan) @@ -669,9 +699,7 @@ func (c *Checker) ReadPacks(ctx context.Context, packs map[restic.ID]int64, p *p // run workers for i := 0; i < workerCount; i++ { g.Go(func() error { - // create a buffer that is large enough to be reused by repository.StreamPack - // this ensures that we can read the pack header later on - bufRd := bufio.NewReaderSize(nil, repository.MaxStreamBufferSize) + bufRd := bufio.NewReaderSize(nil, maxStreamBufferSize) dec, err := zstd.NewReader(nil) if err != nil { panic(dec) diff --git a/internal/repository/repository.go b/internal/repository/repository.go index f2cde014a..41f22f307 100644 --- a/internal/repository/repository.go +++ b/internal/repository/repository.go @@ -1,7 +1,6 @@ package repository import ( - "bufio" "bytes" "context" "fmt" @@ -11,7 +10,6 @@ import ( "sort" "sync" - "github.com/cenkalti/backoff/v4" "github.com/klauspost/compress/zstd" "github.com/restic/chunker" "github.com/restic/restic/internal/backend" @@ -28,8 +26,6 @@ import ( "golang.org/x/sync/errgroup" ) -const MaxStreamBufferSize = 4 * 1024 * 1024 - const MinPackSize = 4 * 1024 * 1024 const DefaultPackSize = 16 * 1024 * 1024 const MaxPackSize = 128 * 1024 * 1024 @@ -951,7 +947,8 @@ const maxUnusedRange = 4 * 1024 * 1024 // LoadBlobsFromPack loads the listed blobs from the specified pack file. The plaintext blob is passed to // the handleBlobFn callback or an error if decryption failed or the blob hash does not match. // handleBlobFn is called at most once for each blob. If the callback returns an error, -// then LoadBlobsFromPack will abort and not retry it. +// then LoadBlobsFromPack will abort and not retry it. The buf passed to the callback is only valid within +// this specific call. The callback must not keep a reference to buf. func (r *Repository) LoadBlobsFromPack(ctx context.Context, packID restic.ID, blobs []restic.Blob, handleBlobFn func(blob restic.BlobHandle, buf []byte, err error) error) error { return streamPack(ctx, r.Backend().Load, r.LoadBlob, r.key, packID, blobs, handleBlobFn) } @@ -968,12 +965,27 @@ func streamPack(ctx context.Context, beLoad backendLoadFn, loadBlobFn loadBlobFn lowerIdx := 0 lastPos := blobs[0].Offset + const maxChunkSize = 2 * DefaultPackSize + for i := 0; i < len(blobs); i++ { if blobs[i].Offset < lastPos { // don't wait for streamPackPart to fail return errors.Errorf("overlapping blobs in pack %v", packID) } + + chunkSizeAfter := (blobs[i].Offset + blobs[i].Length) - blobs[lowerIdx].Offset + split := false + // split if the chunk would become larger than maxChunkSize. Oversized chunks are + // handled by the requirement that the chunk contains at least one blob (i > lowerIdx) + if i > lowerIdx && chunkSizeAfter >= maxChunkSize { + split = true + } + // skip too large gaps as a new request is typically much cheaper than data transfers if blobs[i].Offset-lastPos > maxUnusedRange { + split = true + } + + if split { // load everything up to the skipped file section err := streamPackPart(ctx, beLoad, loadBlobFn, key, packID, blobs[lowerIdx:i], handleBlobFn) if err != nil { @@ -1001,75 +1013,108 @@ func streamPackPart(ctx context.Context, beLoad backendLoadFn, loadBlobFn loadBl } defer dec.Close() - ctx, cancel := context.WithCancel(ctx) - // stream blobs in pack + data := make([]byte, int(dataEnd-dataStart)) err = beLoad(ctx, h, int(dataEnd-dataStart), int64(dataStart), func(rd io.Reader) error { - // prevent callbacks after cancellation - if ctx.Err() != nil { - return ctx.Err() - } - bufferSize := int(dataEnd - dataStart) - if bufferSize > MaxStreamBufferSize { - bufferSize = MaxStreamBufferSize - } - bufRd := bufio.NewReaderSize(rd, bufferSize) - it := NewPackBlobIterator(packID, bufRd, dataStart, blobs, key, dec) - - for { - val, err := it.Next() - if err == ErrPackEOF { - break - } else if err != nil { - return err - } - - if val.Err != nil && loadBlobFn != nil { - var ierr error - // check whether we can get a valid copy somewhere else - buf, ierr := loadBlobFn(ctx, val.Handle.Type, val.Handle.ID, nil) - if ierr == nil { - // success - val.Plaintext = buf - val.Err = nil + _, cerr := io.ReadFull(rd, data) + return cerr + }) + // prevent callbacks after cancellation + if ctx.Err() != nil { + return ctx.Err() + } + if err != nil { + // the context is only still valid if handleBlobFn never returned an error + if loadBlobFn != nil { + // check whether we can get the remaining blobs somewhere else + for _, entry := range blobs { + buf, ierr := loadBlobFn(ctx, entry.Type, entry.ID, nil) + err = handleBlobFn(entry.BlobHandle, buf, ierr) + if err != nil { + break } } - - err = handleBlobFn(val.Handle, val.Plaintext, val.Err) - if err != nil { - cancel() - return backoff.Permanent(err) - } - // ensure that each blob is only passed once to handleBlobFn - blobs = blobs[1:] } - return nil - }) + return errors.Wrap(err, "StreamPack") + } - // the context is only still valid if handleBlobFn never returned an error - if ctx.Err() == nil && loadBlobFn != nil { - // check whether we can get the remaining blobs somewhere else - for _, entry := range blobs { - buf, ierr := loadBlobFn(ctx, entry.Type, entry.ID, nil) - err = handleBlobFn(entry.BlobHandle, buf, ierr) - if err != nil { - break + it := NewPackBlobIterator(packID, newByteReader(data), dataStart, blobs, key, dec) + + for { + val, err := it.Next() + if err == ErrPackEOF { + break + } else if err != nil { + return err + } + + if val.Err != nil && loadBlobFn != nil { + var ierr error + // check whether we can get a valid copy somewhere else + buf, ierr := loadBlobFn(ctx, val.Handle.Type, val.Handle.ID, nil) + if ierr == nil { + // success + val.Plaintext = buf + val.Err = nil } } + + err = handleBlobFn(val.Handle, val.Plaintext, val.Err) + if err != nil { + return err + } + // ensure that each blob is only passed once to handleBlobFn + blobs = blobs[1:] } return errors.Wrap(err, "StreamPack") } +// discardReader allows the PackBlobIterator to perform zero copy +// reads if the underlying data source is a byte slice. +type discardReader interface { + Discard(n int) (discarded int, err error) + // ReadFull reads the next n bytes into a byte slice. The caller must not + // retain a reference to the byte. Modifications are only allowed within + // the boundaries of the returned slice. + ReadFull(n int) (buf []byte, err error) +} + +type byteReader struct { + buf []byte +} + +func newByteReader(buf []byte) *byteReader { + return &byteReader{ + buf: buf, + } +} + +func (b *byteReader) Discard(n int) (discarded int, err error) { + if len(b.buf) < n { + return 0, io.ErrUnexpectedEOF + } + b.buf = b.buf[n:] + return n, nil +} + +func (b *byteReader) ReadFull(n int) (buf []byte, err error) { + if len(b.buf) < n { + return nil, io.ErrUnexpectedEOF + } + buf = b.buf[:n] + b.buf = b.buf[n:] + return buf, nil +} + type PackBlobIterator struct { packID restic.ID - rd *bufio.Reader + rd discardReader currentOffset uint blobs []restic.Blob key *crypto.Key dec *zstd.Decoder - buf []byte decode []byte } @@ -1081,7 +1126,7 @@ type PackBlobValue struct { var ErrPackEOF = errors.New("reached EOF of pack file") -func NewPackBlobIterator(packID restic.ID, rd *bufio.Reader, currentOffset uint, +func NewPackBlobIterator(packID restic.ID, rd discardReader, currentOffset uint, blobs []restic.Blob, key *crypto.Key, dec *zstd.Decoder) *PackBlobIterator { return &PackBlobIterator{ packID: packID, @@ -1116,21 +1161,12 @@ func (b *PackBlobIterator) Next() (PackBlobValue, error) { h := restic.BlobHandle{ID: entry.ID, Type: entry.Type} debug.Log(" process blob %v, skipped %d, %v", h, skipBytes, entry) - if uint(cap(b.buf)) < entry.Length { - b.buf = make([]byte, entry.Length) - } - b.buf = b.buf[:entry.Length] - - n, err := io.ReadFull(b.rd, b.buf) + buf, err := b.rd.ReadFull(int(entry.Length)) if err != nil { debug.Log(" read error %v", err) return PackBlobValue{}, fmt.Errorf("readFull: %w", err) } - if n != len(b.buf) { - return PackBlobValue{}, fmt.Errorf("read blob %v from %v: not enough bytes read, want %v, got %v", - h, b.packID.Str(), len(b.buf), n) - } b.currentOffset = entry.Offset + entry.Length if int(entry.Length) <= b.key.NonceSize() { @@ -1139,7 +1175,7 @@ func (b *PackBlobIterator) Next() (PackBlobValue, error) { } // decryption errors are likely permanent, give the caller a chance to skip them - nonce, ciphertext := b.buf[:b.key.NonceSize()], b.buf[b.key.NonceSize():] + nonce, ciphertext := buf[:b.key.NonceSize()], buf[b.key.NonceSize():] plaintext, err := b.key.Open(ciphertext[:0], nonce, ciphertext, nil) if err != nil { err = fmt.Errorf("decrypting blob %v from %v failed: %w", h, b.packID.Str(), err) From cf700d8794ea64b6b8ae95ddd742aeb4357f47d0 Mon Sep 17 00:00:00 2001 From: Michael Eischer Date: Mon, 22 Apr 2024 21:11:52 +0200 Subject: [PATCH 3/5] repository: streamPack: reuse zstd decoder --- internal/repository/repository.go | 18 ++++++------------ .../repository/repository_internal_test.go | 18 +++++++++++++++--- 2 files changed, 21 insertions(+), 15 deletions(-) diff --git a/internal/repository/repository.go b/internal/repository/repository.go index 41f22f307..3ed9f7afa 100644 --- a/internal/repository/repository.go +++ b/internal/repository/repository.go @@ -950,10 +950,10 @@ const maxUnusedRange = 4 * 1024 * 1024 // then LoadBlobsFromPack will abort and not retry it. The buf passed to the callback is only valid within // this specific call. The callback must not keep a reference to buf. func (r *Repository) LoadBlobsFromPack(ctx context.Context, packID restic.ID, blobs []restic.Blob, handleBlobFn func(blob restic.BlobHandle, buf []byte, err error) error) error { - return streamPack(ctx, r.Backend().Load, r.LoadBlob, r.key, packID, blobs, handleBlobFn) + return streamPack(ctx, r.Backend().Load, r.LoadBlob, r.getZstdDecoder(), r.key, packID, blobs, handleBlobFn) } -func streamPack(ctx context.Context, beLoad backendLoadFn, loadBlobFn loadBlobFn, key *crypto.Key, packID restic.ID, blobs []restic.Blob, handleBlobFn func(blob restic.BlobHandle, buf []byte, err error) error) error { +func streamPack(ctx context.Context, beLoad backendLoadFn, loadBlobFn loadBlobFn, dec *zstd.Decoder, key *crypto.Key, packID restic.ID, blobs []restic.Blob, handleBlobFn func(blob restic.BlobHandle, buf []byte, err error) error) error { if len(blobs) == 0 { // nothing to do return nil @@ -987,7 +987,7 @@ func streamPack(ctx context.Context, beLoad backendLoadFn, loadBlobFn loadBlobFn if split { // load everything up to the skipped file section - err := streamPackPart(ctx, beLoad, loadBlobFn, key, packID, blobs[lowerIdx:i], handleBlobFn) + err := streamPackPart(ctx, beLoad, loadBlobFn, dec, key, packID, blobs[lowerIdx:i], handleBlobFn) if err != nil { return err } @@ -996,10 +996,10 @@ func streamPack(ctx context.Context, beLoad backendLoadFn, loadBlobFn loadBlobFn lastPos = blobs[i].Offset + blobs[i].Length } // load remainder - return streamPackPart(ctx, beLoad, loadBlobFn, key, packID, blobs[lowerIdx:], handleBlobFn) + return streamPackPart(ctx, beLoad, loadBlobFn, dec, key, packID, blobs[lowerIdx:], handleBlobFn) } -func streamPackPart(ctx context.Context, beLoad backendLoadFn, loadBlobFn loadBlobFn, key *crypto.Key, packID restic.ID, blobs []restic.Blob, handleBlobFn func(blob restic.BlobHandle, buf []byte, err error) error) error { +func streamPackPart(ctx context.Context, beLoad backendLoadFn, loadBlobFn loadBlobFn, dec *zstd.Decoder, key *crypto.Key, packID restic.ID, blobs []restic.Blob, handleBlobFn func(blob restic.BlobHandle, buf []byte, err error) error) error { h := backend.Handle{Type: restic.PackFile, Name: packID.String(), IsMetadata: false} dataStart := blobs[0].Offset @@ -1007,14 +1007,8 @@ func streamPackPart(ctx context.Context, beLoad backendLoadFn, loadBlobFn loadBl debug.Log("streaming pack %v (%d to %d bytes), blobs: %v", packID, dataStart, dataEnd, len(blobs)) - dec, err := zstd.NewReader(nil) - if err != nil { - panic(dec) - } - defer dec.Close() - data := make([]byte, int(dataEnd-dataStart)) - err = beLoad(ctx, h, int(dataEnd-dataStart), int64(dataStart), func(rd io.Reader) error { + err := beLoad(ctx, h, int(dataEnd-dataStart), int64(dataStart), func(rd io.Reader) error { _, cerr := io.ReadFull(rd, data) return cerr }) diff --git a/internal/repository/repository_internal_test.go b/internal/repository/repository_internal_test.go index 1f71b17de..16e6e8484 100644 --- a/internal/repository/repository_internal_test.go +++ b/internal/repository/repository_internal_test.go @@ -146,6 +146,12 @@ func TestStreamPack(t *testing.T) { } func testStreamPack(t *testing.T, version uint) { + dec, err := zstd.NewReader(nil) + if err != nil { + panic(dec) + } + defer dec.Close() + // always use the same key for deterministic output key := testKey(t) @@ -270,7 +276,7 @@ func testStreamPack(t *testing.T, version uint) { loadCalls = 0 shortFirstLoad = test.shortFirstLoad - err := streamPack(ctx, load, nil, &key, restic.ID{}, test.blobs, handleBlob) + err := streamPack(ctx, load, nil, dec, &key, restic.ID{}, test.blobs, handleBlob) if err != nil { t.Fatal(err) } @@ -333,7 +339,7 @@ func testStreamPack(t *testing.T, version uint) { return err } - err := streamPack(ctx, load, nil, &key, restic.ID{}, test.blobs, handleBlob) + err := streamPack(ctx, load, nil, dec, &key, restic.ID{}, test.blobs, handleBlob) if err == nil { t.Fatalf("wanted error %v, got nil", test.err) } @@ -456,6 +462,12 @@ func testKey(t *testing.T) crypto.Key { } func TestStreamPackFallback(t *testing.T) { + dec, err := zstd.NewReader(nil) + if err != nil { + panic(dec) + } + defer dec.Close() + test := func(t *testing.T, failLoad bool) { key := testKey(t) ctx, cancel := context.WithCancel(context.Background()) @@ -503,7 +515,7 @@ func TestStreamPackFallback(t *testing.T) { return err } - err := streamPack(ctx, loadPack, loadBlob, &key, restic.ID{}, blobs, handleBlob) + err := streamPack(ctx, loadPack, loadBlob, dec, &key, restic.ID{}, blobs, handleBlob) rtest.OK(t, err) rtest.Assert(t, blobOK, "blob failed to load") } From 20d8eed400ac4a83260c84a0621235c537fbff59 Mon Sep 17 00:00:00 2001 From: Michael Eischer Date: Mon, 22 Apr 2024 21:12:20 +0200 Subject: [PATCH 4/5] repository: streamPack: separate requests for gap larger than 1MB With most cloud providers, traffic is much more expensive than API calls. Thus slightly bias streamPack towards a bit more API calls in exchange for slightly less traffic. --- internal/repository/repository.go | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/internal/repository/repository.go b/internal/repository/repository.go index 3ed9f7afa..84bac9ee3 100644 --- a/internal/repository/repository.go +++ b/internal/repository/repository.go @@ -941,8 +941,8 @@ func (r *Repository) SaveBlob(ctx context.Context, t restic.BlobType, buf []byte type backendLoadFn func(ctx context.Context, h backend.Handle, length int, offset int64, fn func(rd io.Reader) error) error type loadBlobFn func(ctx context.Context, t restic.BlobType, id restic.ID, buf []byte) ([]byte, error) -// Skip sections with more than 4MB unused blobs -const maxUnusedRange = 4 * 1024 * 1024 +// Skip sections with more than 1MB unused blobs +const maxUnusedRange = 1 * 1024 * 1024 // LoadBlobsFromPack loads the listed blobs from the specified pack file. The plaintext blob is passed to // the handleBlobFn callback or an error if decryption failed or the blob hash does not match. From 676f0dc60d01bff28c3a3183c8db06fa391f0bc7 Mon Sep 17 00:00:00 2001 From: Michael Eischer Date: Wed, 1 May 2024 16:28:57 +0200 Subject: [PATCH 5/5] add changelog --- changelog/unreleased/issue-4627 | 8 ++++++++ 1 file changed, 8 insertions(+) create mode 100644 changelog/unreleased/issue-4627 diff --git a/changelog/unreleased/issue-4627 b/changelog/unreleased/issue-4627 new file mode 100644 index 000000000..626b341ea --- /dev/null +++ b/changelog/unreleased/issue-4627 @@ -0,0 +1,8 @@ +Enhancement: Improve reliability of backend operations + +Restic now downloads pack files in large chunks instead of using a streaming +download. This prevents failures due to interrupted streams. The `restore` +command now also retries downloading individual blobs that cannot be retrieved. + +https://github.com/restic/restic/issues/4627 +https://github.com/restic/restic/pull/4605