diff --git a/internal/checker/checker.go b/internal/checker/checker.go index 28f55ce3a..d2fc42ca6 100644 --- a/internal/checker/checker.go +++ b/internal/checker/checker.go @@ -561,7 +561,7 @@ func checkPack(ctx context.Context, r restic.Repository, id restic.ID, blobs []r hrd := hashing.NewReader(rd, sha256.New()) bufRd.Reset(hrd) - it := repository.NewPackBlobIterator(id, bufRd, 0, blobs, r.Key(), dec) + it := repository.NewPackBlobIterator(id, newBufReader(bufRd), 0, blobs, r.Key(), dec) for { val, err := it.Next() if err == repository.ErrPackEOF { @@ -647,11 +647,41 @@ func checkPack(ctx context.Context, r restic.Repository, id restic.ID, blobs []r return nil } +type bufReader struct { + rd *bufio.Reader + buf []byte +} + +func newBufReader(rd *bufio.Reader) *bufReader { + return &bufReader{ + rd: rd, + } +} + +func (b *bufReader) Discard(n int) (discarded int, err error) { + return b.rd.Discard(n) +} + +func (b *bufReader) ReadFull(n int) (buf []byte, err error) { + if cap(b.buf) < n { + b.buf = make([]byte, n) + } + b.buf = b.buf[:n] + + _, err = io.ReadFull(b.rd, b.buf) + if err != nil { + return nil, err + } + return b.buf, nil +} + // ReadData loads all data from the repository and checks the integrity. func (c *Checker) ReadData(ctx context.Context, errChan chan<- error) { c.ReadPacks(ctx, c.packs, nil, errChan) } +const maxStreamBufferSize = 4 * 1024 * 1024 + // ReadPacks loads data from specified packs and checks the integrity. func (c *Checker) ReadPacks(ctx context.Context, packs map[restic.ID]int64, p *progress.Counter, errChan chan<- error) { defer close(errChan) @@ -669,9 +699,7 @@ func (c *Checker) ReadPacks(ctx context.Context, packs map[restic.ID]int64, p *p // run workers for i := 0; i < workerCount; i++ { g.Go(func() error { - // create a buffer that is large enough to be reused by repository.StreamPack - // this ensures that we can read the pack header later on - bufRd := bufio.NewReaderSize(nil, repository.MaxStreamBufferSize) + bufRd := bufio.NewReaderSize(nil, maxStreamBufferSize) dec, err := zstd.NewReader(nil) if err != nil { panic(dec) diff --git a/internal/repository/repository.go b/internal/repository/repository.go index f2cde014a..41f22f307 100644 --- a/internal/repository/repository.go +++ b/internal/repository/repository.go @@ -1,7 +1,6 @@ package repository import ( - "bufio" "bytes" "context" "fmt" @@ -11,7 +10,6 @@ import ( "sort" "sync" - "github.com/cenkalti/backoff/v4" "github.com/klauspost/compress/zstd" "github.com/restic/chunker" "github.com/restic/restic/internal/backend" @@ -28,8 +26,6 @@ import ( "golang.org/x/sync/errgroup" ) -const MaxStreamBufferSize = 4 * 1024 * 1024 - const MinPackSize = 4 * 1024 * 1024 const DefaultPackSize = 16 * 1024 * 1024 const MaxPackSize = 128 * 1024 * 1024 @@ -951,7 +947,8 @@ const maxUnusedRange = 4 * 1024 * 1024 // LoadBlobsFromPack loads the listed blobs from the specified pack file. The plaintext blob is passed to // the handleBlobFn callback or an error if decryption failed or the blob hash does not match. // handleBlobFn is called at most once for each blob. If the callback returns an error, -// then LoadBlobsFromPack will abort and not retry it. +// then LoadBlobsFromPack will abort and not retry it. The buf passed to the callback is only valid within +// this specific call. The callback must not keep a reference to buf. func (r *Repository) LoadBlobsFromPack(ctx context.Context, packID restic.ID, blobs []restic.Blob, handleBlobFn func(blob restic.BlobHandle, buf []byte, err error) error) error { return streamPack(ctx, r.Backend().Load, r.LoadBlob, r.key, packID, blobs, handleBlobFn) } @@ -968,12 +965,27 @@ func streamPack(ctx context.Context, beLoad backendLoadFn, loadBlobFn loadBlobFn lowerIdx := 0 lastPos := blobs[0].Offset + const maxChunkSize = 2 * DefaultPackSize + for i := 0; i < len(blobs); i++ { if blobs[i].Offset < lastPos { // don't wait for streamPackPart to fail return errors.Errorf("overlapping blobs in pack %v", packID) } + + chunkSizeAfter := (blobs[i].Offset + blobs[i].Length) - blobs[lowerIdx].Offset + split := false + // split if the chunk would become larger than maxChunkSize. Oversized chunks are + // handled by the requirement that the chunk contains at least one blob (i > lowerIdx) + if i > lowerIdx && chunkSizeAfter >= maxChunkSize { + split = true + } + // skip too large gaps as a new request is typically much cheaper than data transfers if blobs[i].Offset-lastPos > maxUnusedRange { + split = true + } + + if split { // load everything up to the skipped file section err := streamPackPart(ctx, beLoad, loadBlobFn, key, packID, blobs[lowerIdx:i], handleBlobFn) if err != nil { @@ -1001,75 +1013,108 @@ func streamPackPart(ctx context.Context, beLoad backendLoadFn, loadBlobFn loadBl } defer dec.Close() - ctx, cancel := context.WithCancel(ctx) - // stream blobs in pack + data := make([]byte, int(dataEnd-dataStart)) err = beLoad(ctx, h, int(dataEnd-dataStart), int64(dataStart), func(rd io.Reader) error { - // prevent callbacks after cancellation - if ctx.Err() != nil { - return ctx.Err() - } - bufferSize := int(dataEnd - dataStart) - if bufferSize > MaxStreamBufferSize { - bufferSize = MaxStreamBufferSize - } - bufRd := bufio.NewReaderSize(rd, bufferSize) - it := NewPackBlobIterator(packID, bufRd, dataStart, blobs, key, dec) - - for { - val, err := it.Next() - if err == ErrPackEOF { - break - } else if err != nil { - return err - } - - if val.Err != nil && loadBlobFn != nil { - var ierr error - // check whether we can get a valid copy somewhere else - buf, ierr := loadBlobFn(ctx, val.Handle.Type, val.Handle.ID, nil) - if ierr == nil { - // success - val.Plaintext = buf - val.Err = nil + _, cerr := io.ReadFull(rd, data) + return cerr + }) + // prevent callbacks after cancellation + if ctx.Err() != nil { + return ctx.Err() + } + if err != nil { + // the context is only still valid if handleBlobFn never returned an error + if loadBlobFn != nil { + // check whether we can get the remaining blobs somewhere else + for _, entry := range blobs { + buf, ierr := loadBlobFn(ctx, entry.Type, entry.ID, nil) + err = handleBlobFn(entry.BlobHandle, buf, ierr) + if err != nil { + break } } - - err = handleBlobFn(val.Handle, val.Plaintext, val.Err) - if err != nil { - cancel() - return backoff.Permanent(err) - } - // ensure that each blob is only passed once to handleBlobFn - blobs = blobs[1:] } - return nil - }) + return errors.Wrap(err, "StreamPack") + } - // the context is only still valid if handleBlobFn never returned an error - if ctx.Err() == nil && loadBlobFn != nil { - // check whether we can get the remaining blobs somewhere else - for _, entry := range blobs { - buf, ierr := loadBlobFn(ctx, entry.Type, entry.ID, nil) - err = handleBlobFn(entry.BlobHandle, buf, ierr) - if err != nil { - break + it := NewPackBlobIterator(packID, newByteReader(data), dataStart, blobs, key, dec) + + for { + val, err := it.Next() + if err == ErrPackEOF { + break + } else if err != nil { + return err + } + + if val.Err != nil && loadBlobFn != nil { + var ierr error + // check whether we can get a valid copy somewhere else + buf, ierr := loadBlobFn(ctx, val.Handle.Type, val.Handle.ID, nil) + if ierr == nil { + // success + val.Plaintext = buf + val.Err = nil } } + + err = handleBlobFn(val.Handle, val.Plaintext, val.Err) + if err != nil { + return err + } + // ensure that each blob is only passed once to handleBlobFn + blobs = blobs[1:] } return errors.Wrap(err, "StreamPack") } +// discardReader allows the PackBlobIterator to perform zero copy +// reads if the underlying data source is a byte slice. +type discardReader interface { + Discard(n int) (discarded int, err error) + // ReadFull reads the next n bytes into a byte slice. The caller must not + // retain a reference to the byte. Modifications are only allowed within + // the boundaries of the returned slice. + ReadFull(n int) (buf []byte, err error) +} + +type byteReader struct { + buf []byte +} + +func newByteReader(buf []byte) *byteReader { + return &byteReader{ + buf: buf, + } +} + +func (b *byteReader) Discard(n int) (discarded int, err error) { + if len(b.buf) < n { + return 0, io.ErrUnexpectedEOF + } + b.buf = b.buf[n:] + return n, nil +} + +func (b *byteReader) ReadFull(n int) (buf []byte, err error) { + if len(b.buf) < n { + return nil, io.ErrUnexpectedEOF + } + buf = b.buf[:n] + b.buf = b.buf[n:] + return buf, nil +} + type PackBlobIterator struct { packID restic.ID - rd *bufio.Reader + rd discardReader currentOffset uint blobs []restic.Blob key *crypto.Key dec *zstd.Decoder - buf []byte decode []byte } @@ -1081,7 +1126,7 @@ type PackBlobValue struct { var ErrPackEOF = errors.New("reached EOF of pack file") -func NewPackBlobIterator(packID restic.ID, rd *bufio.Reader, currentOffset uint, +func NewPackBlobIterator(packID restic.ID, rd discardReader, currentOffset uint, blobs []restic.Blob, key *crypto.Key, dec *zstd.Decoder) *PackBlobIterator { return &PackBlobIterator{ packID: packID, @@ -1116,21 +1161,12 @@ func (b *PackBlobIterator) Next() (PackBlobValue, error) { h := restic.BlobHandle{ID: entry.ID, Type: entry.Type} debug.Log(" process blob %v, skipped %d, %v", h, skipBytes, entry) - if uint(cap(b.buf)) < entry.Length { - b.buf = make([]byte, entry.Length) - } - b.buf = b.buf[:entry.Length] - - n, err := io.ReadFull(b.rd, b.buf) + buf, err := b.rd.ReadFull(int(entry.Length)) if err != nil { debug.Log(" read error %v", err) return PackBlobValue{}, fmt.Errorf("readFull: %w", err) } - if n != len(b.buf) { - return PackBlobValue{}, fmt.Errorf("read blob %v from %v: not enough bytes read, want %v, got %v", - h, b.packID.Str(), len(b.buf), n) - } b.currentOffset = entry.Offset + entry.Length if int(entry.Length) <= b.key.NonceSize() { @@ -1139,7 +1175,7 @@ func (b *PackBlobIterator) Next() (PackBlobValue, error) { } // decryption errors are likely permanent, give the caller a chance to skip them - nonce, ciphertext := b.buf[:b.key.NonceSize()], b.buf[b.key.NonceSize():] + nonce, ciphertext := buf[:b.key.NonceSize()], buf[b.key.NonceSize():] plaintext, err := b.key.Open(ciphertext[:0], nonce, ciphertext, nil) if err != nil { err = fmt.Errorf("decrypting blob %v from %v failed: %w", h, b.packID.Str(), err)