check: Use PackBlobIterator instead of StreamPack

To only stream the content of a pack file once, check used StreamPack
with a custom pack load function. This combination was always brittle
and complicates using StreamPack everywhere else. Now that StreamPack
internally uses PackBlobIterator use that primitive instead, which is a
much better fit for what the check command requires.
This commit is contained in:
Michael Eischer 2023-12-31 10:58:26 +01:00
parent fb422497af
commit 22d0c3f8dc
1 changed files with 41 additions and 40 deletions

View File

@ -10,6 +10,7 @@ import (
"sort"
"sync"
"github.com/klauspost/compress/zstd"
"github.com/minio/sha256-simd"
"github.com/restic/restic/internal/backend"
"github.com/restic/restic/internal/backend/s3"
@ -526,7 +527,7 @@ func (c *Checker) GetPacks() map[restic.ID]int64 {
}
// checkPack reads a pack and checks the integrity of all blobs.
func checkPack(ctx context.Context, r restic.Repository, id restic.ID, blobs []restic.Blob, size int64, bufRd *bufio.Reader) error {
func checkPack(ctx context.Context, r restic.Repository, id restic.ID, blobs []restic.Blob, size int64, bufRd *bufio.Reader, dec *zstd.Decoder) error {
debug.Log("checking pack %v", id.String())
if len(blobs) == 0 {
@ -557,49 +558,44 @@ func checkPack(ctx context.Context, r restic.Repository, id restic.ID, blobs []r
// calculate hash on-the-fly while reading the pack and capture pack header
var hash restic.ID
var hdrBuf []byte
hashingLoader := func(ctx context.Context, h backend.Handle, length int, offset int64, fn func(rd io.Reader) error) error {
return r.Backend().Load(ctx, h, int(size), 0, func(rd io.Reader) error {
hrd := hashing.NewReader(rd, sha256.New())
bufRd.Reset(hrd)
h := backend.Handle{Type: backend.PackFile, Name: id.String()}
err := r.Backend().Load(ctx, h, int(size), 0, func(rd io.Reader) error {
hrd := hashing.NewReader(rd, sha256.New())
bufRd.Reset(hrd)
// skip to start of first blob, offset == 0 for correct pack files
_, err := bufRd.Discard(int(offset))
if err != nil {
it := repository.NewPackBlobIterator(id, bufRd, 0, blobs, r.Key(), dec)
for {
val, err := it.Next()
if err == repository.ErrPackEOF {
break
} else if err != nil {
return err
}
err = fn(bufRd)
if err != nil {
return err
debug.Log(" check blob %v: %v", val.Handle.ID, val.Handle)
if val.Err != nil {
debug.Log(" error verifying blob %v: %v", val.Handle.ID, err)
errs = append(errs, errors.Errorf("blob %v: %v", val.Handle.ID, err))
}
// skip enough bytes until we reach the possible header start
curPos := length + int(offset)
minHdrStart := int(size) - pack.MaxHeaderSize
if minHdrStart > curPos {
_, err := bufRd.Discard(minHdrStart - curPos)
if err != nil {
return err
}
}
// read remainder, which should be the pack header
hdrBuf, err = io.ReadAll(bufRd)
if err != nil {
return err
}
hash = restic.IDFromHash(hrd.Sum(nil))
return nil
})
}
err := repository.StreamPack(ctx, hashingLoader, r.Key(), id, blobs, func(blob restic.BlobHandle, buf []byte, err error) error {
debug.Log(" check blob %v: %v", blob.ID, blob)
if err != nil {
debug.Log(" error verifying blob %v: %v", blob.ID, err)
errs = append(errs, errors.Errorf("blob %v: %v", blob.ID, err))
}
// skip enough bytes until we reach the possible header start
curPos := lastBlobEnd
minHdrStart := int(size) - pack.MaxHeaderSize
if minHdrStart > curPos {
_, err := bufRd.Discard(minHdrStart - curPos)
if err != nil {
return err
}
}
// read remainder, which should be the pack header
var err error
hdrBuf, err = io.ReadAll(bufRd)
if err != nil {
return err
}
hash = restic.IDFromHash(hrd.Sum(nil))
return nil
})
if err != nil {
@ -670,6 +666,11 @@ func (c *Checker) ReadPacks(ctx context.Context, packs map[restic.ID]int64, p *p
// create a buffer that is large enough to be reused by repository.StreamPack
// this ensures that we can read the pack header later on
bufRd := bufio.NewReaderSize(nil, repository.MaxStreamBufferSize)
dec, err := zstd.NewReader(nil)
if err != nil {
panic(dec)
}
defer dec.Close()
for {
var ps checkTask
var ok bool
@ -683,7 +684,7 @@ func (c *Checker) ReadPacks(ctx context.Context, packs map[restic.ID]int64, p *p
}
}
err := checkPack(ctx, c.repo, ps.id, ps.blobs, ps.size, bufRd)
err := checkPack(ctx, c.repo, ps.id, ps.blobs, ps.size, bufRd, dec)
p.Add(1)
if err == nil {
continue