diff --git a/internal/checker/checker.go b/internal/checker/checker.go index 3bc0fac87..e6a7a9035 100644 --- a/internal/checker/checker.go +++ b/internal/checker/checker.go @@ -10,6 +10,7 @@ import ( "sort" "sync" + "github.com/klauspost/compress/zstd" "github.com/minio/sha256-simd" "github.com/restic/restic/internal/backend" "github.com/restic/restic/internal/backend/s3" @@ -526,7 +527,7 @@ func (c *Checker) GetPacks() map[restic.ID]int64 { } // checkPack reads a pack and checks the integrity of all blobs. -func checkPack(ctx context.Context, r restic.Repository, id restic.ID, blobs []restic.Blob, size int64, bufRd *bufio.Reader) error { +func checkPack(ctx context.Context, r restic.Repository, id restic.ID, blobs []restic.Blob, size int64, bufRd *bufio.Reader, dec *zstd.Decoder) error { debug.Log("checking pack %v", id.String()) if len(blobs) == 0 { @@ -557,49 +558,44 @@ func checkPack(ctx context.Context, r restic.Repository, id restic.ID, blobs []r // calculate hash on-the-fly while reading the pack and capture pack header var hash restic.ID var hdrBuf []byte - hashingLoader := func(ctx context.Context, h backend.Handle, length int, offset int64, fn func(rd io.Reader) error) error { - return r.Backend().Load(ctx, h, int(size), 0, func(rd io.Reader) error { - hrd := hashing.NewReader(rd, sha256.New()) - bufRd.Reset(hrd) + h := backend.Handle{Type: backend.PackFile, Name: id.String()} + err := r.Backend().Load(ctx, h, int(size), 0, func(rd io.Reader) error { + hrd := hashing.NewReader(rd, sha256.New()) + bufRd.Reset(hrd) - // skip to start of first blob, offset == 0 for correct pack files - _, err := bufRd.Discard(int(offset)) - if err != nil { + it := repository.NewPackBlobIterator(id, bufRd, 0, blobs, r.Key(), dec) + for { + val, err := it.Next() + if err == repository.ErrPackEOF { + break + } else if err != nil { return err } - - err = fn(bufRd) - if err != nil { - return err + debug.Log(" check blob %v: %v", val.Handle.ID, val.Handle) + if val.Err != nil { + debug.Log(" error verifying blob %v: %v", val.Handle.ID, err) + errs = append(errs, errors.Errorf("blob %v: %v", val.Handle.ID, err)) } - - // skip enough bytes until we reach the possible header start - curPos := length + int(offset) - minHdrStart := int(size) - pack.MaxHeaderSize - if minHdrStart > curPos { - _, err := bufRd.Discard(minHdrStart - curPos) - if err != nil { - return err - } - } - - // read remainder, which should be the pack header - hdrBuf, err = io.ReadAll(bufRd) - if err != nil { - return err - } - - hash = restic.IDFromHash(hrd.Sum(nil)) - return nil - }) - } - - err := repository.StreamPack(ctx, hashingLoader, r.Key(), id, blobs, func(blob restic.BlobHandle, buf []byte, err error) error { - debug.Log(" check blob %v: %v", blob.ID, blob) - if err != nil { - debug.Log(" error verifying blob %v: %v", blob.ID, err) - errs = append(errs, errors.Errorf("blob %v: %v", blob.ID, err)) } + + // skip enough bytes until we reach the possible header start + curPos := lastBlobEnd + minHdrStart := int(size) - pack.MaxHeaderSize + if minHdrStart > curPos { + _, err := bufRd.Discard(minHdrStart - curPos) + if err != nil { + return err + } + } + + // read remainder, which should be the pack header + var err error + hdrBuf, err = io.ReadAll(bufRd) + if err != nil { + return err + } + + hash = restic.IDFromHash(hrd.Sum(nil)) return nil }) if err != nil { @@ -670,6 +666,11 @@ func (c *Checker) ReadPacks(ctx context.Context, packs map[restic.ID]int64, p *p // create a buffer that is large enough to be reused by repository.StreamPack // this ensures that we can read the pack header later on bufRd := bufio.NewReaderSize(nil, repository.MaxStreamBufferSize) + dec, err := zstd.NewReader(nil) + if err != nil { + panic(dec) + } + defer dec.Close() for { var ps checkTask var ok bool @@ -683,7 +684,7 @@ func (c *Checker) ReadPacks(ctx context.Context, packs map[restic.ID]int64, p *p } } - err := checkPack(ctx, c.repo, ps.id, ps.blobs, ps.size, bufRd) + err := checkPack(ctx, c.repo, ps.id, ps.blobs, ps.size, bufRd, dec) p.Add(1) if err == nil { continue