diff --git a/cmd/restic/integration_test.go b/cmd/restic/integration_test.go index 42936d2ea..f926306eb 100644 --- a/cmd/restic/integration_test.go +++ b/cmd/restic/integration_test.go @@ -2135,7 +2135,4 @@ func TestBackendLoadWriteTo(t *testing.T) { firstSnapshot := testRunList(t, "snapshots", env.gopts) rtest.Assert(t, len(firstSnapshot) == 1, "expected one snapshot, got %v", firstSnapshot) - - // test readData using the hashing.Reader - testRunCheck(t, env.gopts) } diff --git a/internal/checker/checker.go b/internal/checker/checker.go index e842a08be..5262789d0 100644 --- a/internal/checker/checker.go +++ b/internal/checker/checker.go @@ -1,14 +1,19 @@ package checker import ( + "bufio" + "bytes" "context" "fmt" "io" - "os" + "io/ioutil" + "sort" "sync" + "github.com/minio/sha256-simd" "github.com/restic/restic/internal/debug" "github.com/restic/restic/internal/errors" + "github.com/restic/restic/internal/hashing" "github.com/restic/restic/internal/pack" "github.com/restic/restic/internal/repository" "github.com/restic/restic/internal/restic" @@ -436,78 +441,104 @@ func (c *Checker) GetPacks() map[restic.ID]int64 { } // checkPack reads a pack and checks the integrity of all blobs. -func checkPack(ctx context.Context, r restic.Repository, id restic.ID, size int64) error { - debug.Log("checking pack %v", id) - h := restic.Handle{Type: restic.PackFile, Name: id.String()} +func checkPack(ctx context.Context, r restic.Repository, id restic.ID, blobs []restic.Blob, size int64, bufRd *bufio.Reader) error { + debug.Log("checking pack %v", id.String()) - packfile, hash, realSize, err := repository.DownloadAndHash(ctx, r.Backend(), h) - if err != nil { - return errors.Wrap(err, "checkPack") + if len(blobs) == 0 { + return errors.Errorf("pack %v is empty or not indexed", id) } - defer func() { - _ = packfile.Close() - _ = os.Remove(packfile.Name()) - }() + // sanity check blobs in index + sort.Slice(blobs, func(i, j int) bool { + return blobs[i].Offset < blobs[j].Offset + }) + idxHdrSize := pack.HeaderSize + len(blobs)*int(pack.EntrySize) + lastBlobEnd := 0 + nonContinuousPack := false + for _, blob := range blobs { + if lastBlobEnd != int(blob.Offset) { + nonContinuousPack = true + } + lastBlobEnd = int(blob.Offset + blob.Length) + } + // size was calculated by masterindex.PackSize, thus there's no need to recalculate it here - debug.Log("hash for pack %v is %v", id, hash) + var errs []error + if nonContinuousPack { + debug.Log("Index for pack contains gaps / overlaps, blobs: %v", blobs) + errs = append(errs, errors.New("Index for pack contains gaps / overlapping blobs")) + } + // calculate hash on-the-fly while reading the pack and capture pack header + var hash restic.ID + var hdrBuf []byte + hashingLoader := func(ctx context.Context, h restic.Handle, length int, offset int64, fn func(rd io.Reader) error) error { + return r.Backend().Load(ctx, h, int(size), 0, func(rd io.Reader) error { + hrd := hashing.NewReader(rd, sha256.New()) + bufRd.Reset(hrd) + + // skip to start of first blob, offset == 0 for correct pack files + _, err := bufRd.Discard(int(offset)) + if err != nil { + return err + } + + err = fn(bufRd) + if err != nil { + return err + } + + // skip enough bytes until we reach the possible header start + curPos := length + int(offset) + minHdrStart := int(size) - pack.MaxHeaderSize + if minHdrStart > curPos { + _, err := bufRd.Discard(minHdrStart - curPos) + if err != nil { + return err + } + } + + // read remainder, which should be the pack header + hdrBuf, err = ioutil.ReadAll(bufRd) + if err != nil { + return err + } + + hash = restic.IDFromHash(hrd.Sum(nil)) + return nil + }) + } + + err := repository.StreamPack(ctx, hashingLoader, r.Key(), id, blobs, func(blob restic.BlobHandle, buf []byte, err error) error { + debug.Log(" check blob %v: %v", blob.ID, blob) + if err != nil { + debug.Log(" error verifying blob %v: %v", blob.ID, err) + errs = append(errs, errors.Errorf("blob %v: %v", blob.ID, err)) + } + return nil + }) + if err != nil { + // failed to load the pack file, return as further checks cannot succeed anyways + debug.Log(" error streaming pack: %v", err) + return errors.Errorf("pack %v failed to download: %v", err) + } if !hash.Equal(id) { debug.Log("Pack ID does not match, want %v, got %v", id, hash) return errors.Errorf("Pack ID does not match, want %v, got %v", id.Str(), hash.Str()) } - if realSize != size { - debug.Log("Pack size does not match, want %v, got %v", size, realSize) - return errors.Errorf("Pack size does not match, want %v, got %v", size, realSize) - } - - blobs, hdrSize, err := pack.List(r.Key(), packfile, size) + blobs, hdrSize, err := pack.List(r.Key(), bytes.NewReader(hdrBuf), int64(len(hdrBuf))) if err != nil { return err } - var errs []error - var buf []byte - sizeFromBlobs := uint(hdrSize) + if uint32(idxHdrSize) != hdrSize { + debug.Log("Pack header size does not match, want %v, got %v", idxHdrSize, hdrSize) + errs = append(errs, errors.Errorf("Pack header size does not match, want %v, got %v", idxHdrSize, hdrSize)) + } + idx := r.Index() - for i, blob := range blobs { - sizeFromBlobs += blob.Length - debug.Log(" check blob %d: %v", i, blob) - - buf = buf[:cap(buf)] - if uint(len(buf)) < blob.Length { - buf = make([]byte, blob.Length) - } - buf = buf[:blob.Length] - - _, err := packfile.Seek(int64(blob.Offset), 0) - if err != nil { - return errors.Errorf("Seek(%v): %v", blob.Offset, err) - } - - _, err = io.ReadFull(packfile, buf) - if err != nil { - debug.Log(" error loading blob %v: %v", blob.ID, err) - errs = append(errs, errors.Errorf("blob %v: %v", i, err)) - continue - } - - nonce, ciphertext := buf[:r.Key().NonceSize()], buf[r.Key().NonceSize():] - plaintext, err := r.Key().Open(ciphertext[:0], nonce, ciphertext, nil) - if err != nil { - debug.Log(" error decrypting blob %v: %v", blob.ID, err) - errs = append(errs, errors.Errorf("blob %v: %v", i, err)) - continue - } - - hash := restic.Hash(plaintext) - if !hash.Equal(blob.ID) { - debug.Log(" Blob ID does not match, want %v, got %v", blob.ID, hash) - errs = append(errs, errors.Errorf("Blob ID does not match, want %v, got %v", blob.ID.Str(), hash.Str())) - continue - } - + for _, blob := range blobs { // Check if blob is contained in index and position is correct idxHas := false for _, pb := range idx.Lookup(blob.BlobHandle) { @@ -522,11 +553,6 @@ func checkPack(ctx context.Context, r restic.Repository, id restic.ID, size int6 } } - if int64(sizeFromBlobs) != size { - debug.Log("Pack size does not match, want %v, got %v", size, sizeFromBlobs) - errs = append(errs, errors.Errorf("Pack size does not match, want %v, got %v", size, sizeFromBlobs)) - } - if len(errs) > 0 { return errors.Errorf("pack %v contains %v errors: %v", id.Str(), len(errs), errs) } @@ -544,17 +570,21 @@ func (c *Checker) ReadPacks(ctx context.Context, packs map[restic.ID]int64, p *p defer close(errChan) g, ctx := errgroup.WithContext(ctx) - type packsize struct { - id restic.ID - size int64 + type checkTask struct { + id restic.ID + size int64 + blobs []restic.Blob } - ch := make(chan packsize) + ch := make(chan checkTask) // run workers for i := 0; i < defaultParallelism; i++ { g.Go(func() error { + // create a buffer that is large enough to be reused by repository.StreamPack + // this ensures that we can read the pack header later on + bufRd := bufio.NewReaderSize(nil, repository.MaxStreamBufferSize) for { - var ps packsize + var ps checkTask var ok bool select { @@ -565,7 +595,8 @@ func (c *Checker) ReadPacks(ctx context.Context, packs map[restic.ID]int64, p *p return nil } } - err := checkPack(ctx, c.repo, ps.id, ps.size) + + err := checkPack(ctx, c.repo, ps.id, ps.blobs, ps.size, bufRd) p.Add(1) if err == nil { continue @@ -580,10 +611,17 @@ func (c *Checker) ReadPacks(ctx context.Context, packs map[restic.ID]int64, p *p }) } + packSet := restic.NewIDSet() + for pack := range packs { + packSet.Insert(pack) + } + // push packs to ch - for pack, size := range packs { + for pbs := range c.repo.Index().ListPacks(ctx, packSet) { + size := packs[pbs.PackID] + debug.Log("listed %v", pbs.PackID) select { - case ch <- packsize{id: pack, size: size}: + case ch <- checkTask{id: pbs.PackID, size: size, blobs: pbs.Blobs}: case <-ctx.Done(): } } diff --git a/internal/pack/pack.go b/internal/pack/pack.go index d679c658b..95f298acb 100644 --- a/internal/pack/pack.go +++ b/internal/pack/pack.go @@ -160,7 +160,8 @@ const ( // HeaderSize is the header's constant overhead (independent of #entries) HeaderSize = headerLengthSize + crypto.Extension - maxHeaderSize = 16 * 1024 * 1024 + // MaxHeaderSize is the max size of header including header-length field + MaxHeaderSize = 16*1024*1024 + headerLengthSize // number of header enries to download as part of header-length request eagerEntries = 15 ) @@ -199,7 +200,7 @@ func readRecords(rd io.ReaderAt, size int64, max int) ([]byte, int, error) { err = InvalidFileError{Message: "header length is invalid"} case int64(hlen) > size-int64(headerLengthSize): err = InvalidFileError{Message: "header is larger than file"} - case int64(hlen) > maxHeaderSize: + case int64(hlen) > MaxHeaderSize-int64(headerLengthSize): err = InvalidFileError{Message: "header is larger than maxHeaderSize"} } if err != nil { diff --git a/internal/repository/master_index.go b/internal/repository/master_index.go index 5e099a3a5..adc110df8 100644 --- a/internal/repository/master_index.go +++ b/internal/repository/master_index.go @@ -400,3 +400,42 @@ func (mi *MasterIndex) Save(ctx context.Context, repo restic.Repository, packBla return obsolete, err } + +// ListPacks returns the blobs of the specified pack files grouped by pack file. +func (mi *MasterIndex) ListPacks(ctx context.Context, packs restic.IDSet) <-chan restic.PackBlobs { + out := make(chan restic.PackBlobs) + go func() { + defer close(out) + // only resort a part of the index to keep the memory overhead bounded + for i := byte(0); i < 16; i++ { + if ctx.Err() != nil { + return + } + + packBlob := make(map[restic.ID][]restic.Blob) + for pack := range packs { + if pack[0]&0xf == i { + packBlob[pack] = nil + } + } + if len(packBlob) == 0 { + continue + } + for pb := range mi.Each(ctx) { + if packs.Has(pb.PackID) && pb.PackID[0]&0xf == i { + packBlob[pb.PackID] = append(packBlob[pb.PackID], pb.Blob) + } + } + + // pass on packs + for packID, pbs := range packBlob { + select { + case out <- restic.PackBlobs{PackID: packID, Blobs: pbs}: + case <-ctx.Done(): + return + } + } + } + }() + return out +} diff --git a/internal/repository/repack.go b/internal/repository/repack.go index 423f3c831..0734c8206 100644 --- a/internal/repository/repack.go +++ b/internal/repository/repack.go @@ -2,13 +2,9 @@ package repository import ( "context" - "os" "sync" "github.com/restic/restic/internal/debug" - "github.com/restic/restic/internal/errors" - "github.com/restic/restic/internal/fs" - "github.com/restic/restic/internal/pack" "github.com/restic/restic/internal/restic" "github.com/restic/restic/internal/ui/progress" @@ -27,147 +23,63 @@ const numRepackWorkers = 8 func Repack(ctx context.Context, repo restic.Repository, packs restic.IDSet, keepBlobs restic.BlobSet, p *progress.Counter) (obsoletePacks restic.IDSet, err error) { debug.Log("repacking %d packs while keeping %d blobs", len(packs), len(keepBlobs)) + var keepMutex sync.Mutex wg, wgCtx := errgroup.WithContext(ctx) - downloadQueue := make(chan restic.ID) + downloadQueue := make(chan restic.PackBlobs) wg.Go(func() error { defer close(downloadQueue) - for packID := range packs { - select { - case downloadQueue <- packID: - case <-wgCtx.Done(): - return wgCtx.Err() - } - } - return nil - }) - - type repackJob struct { - tempfile *os.File - hash restic.ID - packLength int64 - } - processQueue := make(chan repackJob) - // used to close processQueue once all downloaders have finished - var downloadWG sync.WaitGroup - - downloader := func() error { - defer downloadWG.Done() - for packID := range downloadQueue { - // load the complete pack into a temp file - h := restic.Handle{Type: restic.PackFile, Name: packID.String()} - - tempfile, hash, packLength, err := DownloadAndHash(wgCtx, repo.Backend(), h) - if err != nil { - return errors.Wrap(err, "Repack") - } - - debug.Log("pack %v loaded (%d bytes), hash %v", packID, packLength, hash) - - if !packID.Equal(hash) { - return errors.Errorf("hash does not match id: want %v, got %v", packID, hash) - } - - select { - case processQueue <- repackJob{tempfile, hash, packLength}: - case <-wgCtx.Done(): - return wgCtx.Err() - } - } - return nil - } - - downloadWG.Add(numRepackWorkers) - for i := 0; i < numRepackWorkers; i++ { - wg.Go(downloader) - } - wg.Go(func() error { - downloadWG.Wait() - close(processQueue) - return nil - }) - - var keepMutex sync.Mutex - worker := func() error { - for job := range processQueue { - tempfile, packID, packLength := job.tempfile, job.hash, job.packLength - - blobs, _, err := pack.List(repo.Key(), tempfile, packLength) - if err != nil { - return err - } - - debug.Log("processing pack %v, blobs: %v", packID, len(blobs)) - var buf []byte - for _, entry := range blobs { + for pbs := range repo.Index().ListPacks(ctx, packs) { + var packBlobs []restic.Blob + keepMutex.Lock() + // filter out unnecessary blobs + for _, entry := range pbs.Blobs { h := restic.BlobHandle{ID: entry.ID, Type: entry.Type} - - keepMutex.Lock() - shouldKeep := keepBlobs.Has(h) - keepMutex.Unlock() - - if !shouldKeep { - continue + if keepBlobs.Has(h) { + packBlobs = append(packBlobs, entry) } + } + keepMutex.Unlock() - debug.Log(" process blob %v", h) + select { + case downloadQueue <- restic.PackBlobs{PackID: pbs.PackID, Blobs: packBlobs}: + case <-wgCtx.Done(): + return wgCtx.Err() + } + } + return nil + }) - if uint(cap(buf)) < entry.Length { - buf = make([]byte, entry.Length) - } - buf = buf[:entry.Length] - - n, err := tempfile.ReadAt(buf, int64(entry.Offset)) - if err != nil { - return errors.Wrap(err, "ReadAt") - } - - if n != len(buf) { - return errors.Errorf("read blob %v from %v: not enough bytes read, want %v, got %v", - h, tempfile.Name(), len(buf), n) - } - - nonce, ciphertext := buf[:repo.Key().NonceSize()], buf[repo.Key().NonceSize():] - plaintext, err := repo.Key().Open(ciphertext[:0], nonce, ciphertext, nil) + worker := func() error { + for t := range downloadQueue { + err := StreamPack(wgCtx, repo.Backend().Load, repo.Key(), t.PackID, t.Blobs, func(blob restic.BlobHandle, buf []byte, err error) error { if err != nil { return err } - id := restic.Hash(plaintext) - if !id.Equal(entry.ID) { - debug.Log("read blob %v/%v from %v: wrong data returned, hash is %v", - h.Type, h.ID, tempfile.Name(), id) - return errors.Errorf("read blob %v from %v: wrong data returned, hash is %v", - h, tempfile.Name(), id) - } - keepMutex.Lock() // recheck whether some other worker was faster - shouldKeep = keepBlobs.Has(h) + shouldKeep := keepBlobs.Has(blob) if shouldKeep { - keepBlobs.Delete(h) + keepBlobs.Delete(blob) } keepMutex.Unlock() if !shouldKeep { - continue + return nil } // We do want to save already saved blobs! - _, _, err = repo.SaveBlob(wgCtx, entry.Type, plaintext, entry.ID, true) + _, _, err = repo.SaveBlob(wgCtx, blob.Type, buf, blob.ID, true) if err != nil { return err } - debug.Log(" saved blob %v", entry.ID) - } - - if err = tempfile.Close(); err != nil { - return errors.Wrap(err, "Close") - } - - if err = fs.RemoveIfExists(tempfile.Name()); err != nil { - return errors.Wrap(err, "Remove") + debug.Log(" saved blob %v", blob.ID) + return nil + }) + if err != nil { + return err } p.Add(1) } diff --git a/internal/repository/repository.go b/internal/repository/repository.go index 1ed65083c..39953cd88 100644 --- a/internal/repository/repository.go +++ b/internal/repository/repository.go @@ -1,30 +1,32 @@ package repository import ( + "bufio" "bytes" "context" "encoding/json" "fmt" "io" "os" + "sort" "sync" + "github.com/cenkalti/backoff/v4" "github.com/restic/chunker" "github.com/restic/restic/internal/backend/dryrun" "github.com/restic/restic/internal/cache" "github.com/restic/restic/internal/crypto" "github.com/restic/restic/internal/debug" "github.com/restic/restic/internal/errors" - "github.com/restic/restic/internal/fs" - "github.com/restic/restic/internal/hashing" "github.com/restic/restic/internal/pack" "github.com/restic/restic/internal/restic" "github.com/restic/restic/internal/ui/progress" - "github.com/minio/sha256-simd" "golang.org/x/sync/errgroup" ) +const MaxStreamBufferSize = 4 * 1024 * 1024 + // Repository is used to access a repository in a backend. type Repository struct { be restic.Backend @@ -742,43 +744,99 @@ type Loader interface { Load(ctx context.Context, h restic.Handle, length int, offset int64, fn func(rd io.Reader) error) error } -// DownloadAndHash is all-in-one helper to download content of the file at h to a temporary filesystem location -// and calculate ID of the contents. Returned (temporary) file is positioned at the beginning of the file; -// it is the reponsibility of the caller to close and delete the file. -func DownloadAndHash(ctx context.Context, be Loader, h restic.Handle) (tmpfile *os.File, hash restic.ID, size int64, err error) { - tmpfile, err = fs.TempFile("", "restic-temp-") - if err != nil { - return nil, restic.ID{}, -1, errors.Wrap(err, "TempFile") +type BackendLoadFn func(ctx context.Context, h restic.Handle, length int, offset int64, fn func(rd io.Reader) error) error + +// StreamPack loads the listed blobs from the specified pack file. The plaintext blob is passed to +// the handleBlobFn callback or an error if decryption failed or the blob hash does not match. In +// case of download errors handleBlobFn might be called multiple times for the same blob. If the +// callback returns an error, then StreamPack will abort and not retry it. +func StreamPack(ctx context.Context, beLoad BackendLoadFn, key *crypto.Key, packID restic.ID, blobs []restic.Blob, handleBlobFn func(blob restic.BlobHandle, buf []byte, err error) error) error { + if len(blobs) == 0 { + // nothing to do + return nil } - err = be.Load(ctx, h, 0, 0, func(rd io.Reader) (ierr error) { - _, ierr = tmpfile.Seek(0, io.SeekStart) - if ierr == nil { - ierr = tmpfile.Truncate(0) - } - if ierr != nil { - return ierr - } - hrd := hashing.NewReader(rd, sha256.New()) - size, ierr = io.Copy(tmpfile, hrd) - hash = restic.IDFromHash(hrd.Sum(nil)) - return ierr + sort.Slice(blobs, func(i, j int) bool { + return blobs[i].Offset < blobs[j].Offset }) + h := restic.Handle{Type: restic.PackFile, Name: packID.String(), ContainedBlobType: restic.DataBlob} - if err != nil { - // ignore subsequent errors - _ = tmpfile.Close() - _ = os.Remove(tmpfile.Name()) - return nil, restic.ID{}, -1, errors.Wrap(err, "Load") - } + dataStart := blobs[0].Offset + dataEnd := blobs[len(blobs)-1].Offset + blobs[len(blobs)-1].Length - _, err = tmpfile.Seek(0, io.SeekStart) - if err != nil { - // ignore subsequent errors - _ = tmpfile.Close() - _ = os.Remove(tmpfile.Name()) - return nil, restic.ID{}, -1, errors.Wrap(err, "Seek") - } + debug.Log("streaming pack %v (%d to %d bytes), blobs: %v", packID, dataStart, dataEnd, len(blobs)) - return tmpfile, hash, size, err + ctx, cancel := context.WithCancel(ctx) + // stream blobs in pack + err := beLoad(ctx, h, int(dataEnd-dataStart), int64(dataStart), func(rd io.Reader) error { + // prevent callbacks after cancelation + if ctx.Err() != nil { + return ctx.Err() + } + bufferSize := int(dataEnd - dataStart) + if bufferSize > MaxStreamBufferSize { + bufferSize = MaxStreamBufferSize + } + // create reader here to allow reusing the buffered reader from checker.checkData + bufRd := bufio.NewReaderSize(rd, bufferSize) + currentBlobEnd := dataStart + var buf []byte + for _, entry := range blobs { + skipBytes := int(entry.Offset - currentBlobEnd) + if skipBytes < 0 { + return errors.Errorf("overlapping blobs in pack %v", packID) + } + + _, err := bufRd.Discard(skipBytes) + if err != nil { + return err + } + + h := restic.BlobHandle{ID: entry.ID, Type: entry.Type} + debug.Log(" process blob %v, skipped %d, %v", h, skipBytes, entry) + + if uint(cap(buf)) < entry.Length { + buf = make([]byte, entry.Length) + } + buf = buf[:entry.Length] + + n, err := io.ReadFull(bufRd, buf) + if err != nil { + debug.Log(" read error %v", err) + return errors.Wrap(err, "ReadFull") + } + + if n != len(buf) { + return errors.Errorf("read blob %v from %v: not enough bytes read, want %v, got %v", + h, packID.Str(), len(buf), n) + } + currentBlobEnd = entry.Offset + entry.Length + + if int(entry.Length) <= key.NonceSize() { + debug.Log("%v", blobs) + return errors.Errorf("invalid blob length %v", entry) + } + + // decryption errors are likely permanent, give the caller a chance to skip them + nonce, ciphertext := buf[:key.NonceSize()], buf[key.NonceSize():] + plaintext, err := key.Open(ciphertext[:0], nonce, ciphertext, nil) + if err == nil { + id := restic.Hash(plaintext) + if !id.Equal(entry.ID) { + debug.Log("read blob %v/%v from %v: wrong data returned, hash is %v", + h.Type, h.ID, packID.Str(), id) + err = errors.Errorf("read blob %v from %v: wrong data returned, hash is %v", + h, packID.Str(), id) + } + } + + err = handleBlobFn(entry.BlobHandle, plaintext, err) + if err != nil { + cancel() + return backoff.Permanent(err) + } + } + return nil + }) + return errors.Wrap(err, "StreamPack") } diff --git a/internal/repository/repository_test.go b/internal/repository/repository_test.go index 22a424556..167ffa535 100644 --- a/internal/repository/repository_test.go +++ b/internal/repository/repository_test.go @@ -4,19 +4,22 @@ import ( "bytes" "context" "crypto/sha256" + "encoding/json" "fmt" "io" "math/rand" "os" "path/filepath" + "strings" "testing" "time" + "github.com/google/go-cmp/cmp" "github.com/restic/restic/internal/archiver" - "github.com/restic/restic/internal/errors" - "github.com/restic/restic/internal/fs" + "github.com/restic/restic/internal/crypto" "github.com/restic/restic/internal/repository" "github.com/restic/restic/internal/restic" + "github.com/restic/restic/internal/test" rtest "github.com/restic/restic/internal/test" ) @@ -410,108 +413,202 @@ func TestRepositoryIncrementalIndex(t *testing.T) { t.Errorf("pack %v listed in %d indexes\n", packID, len(ids)) } } + } -type backend struct { - rd io.Reader -} +// buildPackfileWithoutHeader returns a manually built pack file without a header. +func buildPackfileWithoutHeader(t testing.TB, blobSizes []int, key *crypto.Key) (blobs []restic.Blob, packfile []byte) { + var offset uint + for i, size := range blobSizes { + plaintext := test.Random(800+i, size) -func (be backend) Load(ctx context.Context, h restic.Handle, length int, offset int64, fn func(rd io.Reader) error) error { - return fn(be.rd) -} + // we use a deterministic nonce here so the whole process is + // deterministic, last byte is the blob index + var nonce = []byte{ + 0x15, 0x98, 0xc0, 0xf7, 0xb9, 0x65, 0x97, 0x74, + 0x12, 0xdc, 0xd3, 0x62, 0xa9, 0x6e, 0x20, byte(i), + } -type retryBackend struct { - buf []byte -} + before := len(packfile) + packfile = append(packfile, nonce...) + packfile = key.Seal(packfile, nonce, plaintext, nil) + after := len(packfile) -func (be retryBackend) Load(ctx context.Context, h restic.Handle, length int, offset int64, fn func(rd io.Reader) error) error { - err := fn(bytes.NewReader(be.buf[:len(be.buf)/2])) - if err != nil { - return err + ciphertextLength := after - before + + blobs = append(blobs, restic.Blob{ + BlobHandle: restic.BlobHandle{ + ID: restic.Hash(plaintext), + Type: restic.DataBlob, + }, + Length: uint(ciphertextLength), + Offset: offset, + }) + + offset = uint(len(packfile)) } - return fn(bytes.NewReader(be.buf)) + return blobs, packfile } -func TestDownloadAndHash(t *testing.T) { - buf := make([]byte, 5*1024*1024+881) - _, err := io.ReadFull(rnd, buf) +func TestStreamPack(t *testing.T) { + // always use the same key for deterministic output + const jsonKey = `{"mac":{"k":"eQenuI8adktfzZMuC8rwdA==","r":"k8cfAly2qQSky48CQK7SBA=="},"encrypt":"MKO9gZnRiQFl8mDUurSDa9NMjiu9MUifUrODTHS05wo="}` + + var key crypto.Key + err := json.Unmarshal([]byte(jsonKey), &key) if err != nil { t.Fatal(err) } - var tests = []struct { - be repository.Loader - want []byte - }{ - { - be: backend{rd: bytes.NewReader(buf)}, - want: buf, - }, - { - be: retryBackend{buf: buf}, - want: buf, - }, + blobSizes := []int{ + 10, + 5231, + 18812, + 123123, + 12301, + 892242, + 28616, + 13351, + 252287, + 188883, + 2522811, + 18883, } - for _, test := range tests { - t.Run("", func(t *testing.T) { - f, id, size, err := repository.DownloadAndHash(context.TODO(), test.be, restic.Handle{}) - if err != nil { - t.Error(err) - } + packfileBlobs, packfile := buildPackfileWithoutHeader(t, blobSizes, &key) - want := restic.Hash(test.want) - if !want.Equal(id) { - t.Errorf("wrong hash returned, want %v, got %v", want.Str(), id.Str()) - } + load := func(ctx context.Context, h restic.Handle, length int, offset int64, fn func(rd io.Reader) error) error { + data := packfile - if size != int64(len(test.want)) { - t.Errorf("wrong size returned, want %v, got %v", test.want, size) - } + if offset > int64(len(data)) { + offset = 0 + length = 0 + } + data = data[offset:] - err = f.Close() - if err != nil { - t.Error(err) - } + if length > len(data) { + length = len(data) + } - err = fs.RemoveIfExists(f.Name()) - if err != nil { - t.Fatal(err) - } - }) - } -} + data = data[:length] -type errorReader struct { - err error -} + return fn(bytes.NewReader(data)) -func (er errorReader) Read(p []byte) (n int, err error) { - return 0, er.err -} - -func TestDownloadAndHashErrors(t *testing.T) { - var tests = []struct { - be repository.Loader - err string - }{ - { - be: backend{rd: errorReader{errors.New("test error 1")}}, - err: "test error 1", - }, } - for _, test := range tests { - t.Run("", func(t *testing.T) { - _, _, _, err := repository.DownloadAndHash(context.TODO(), test.be, restic.Handle{}) - if err == nil { - t.Fatalf("wanted error %q, got nil", test.err) - } + // first, test regular usage + t.Run("regular", func(t *testing.T) { + tests := []struct { + blobs []restic.Blob + }{ + {packfileBlobs[1:2]}, + {packfileBlobs[2:5]}, + {packfileBlobs[2:8]}, + {[]restic.Blob{ + packfileBlobs[0], + packfileBlobs[8], + packfileBlobs[4], + }}, + {[]restic.Blob{ + packfileBlobs[0], + packfileBlobs[len(packfileBlobs)-1], + }}, + } - if errors.Cause(err).Error() != test.err { - t.Fatalf("wanted error %q, got %q", test.err, err) - } - }) - } + for _, test := range tests { + t.Run("", func(t *testing.T) { + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + + gotBlobs := make(map[restic.ID]int) + + handleBlob := func(blob restic.BlobHandle, buf []byte, err error) error { + gotBlobs[blob.ID]++ + + id := restic.Hash(buf) + if !id.Equal(blob.ID) { + t.Fatalf("wrong id %v for blob %s returned", id, blob.ID) + } + + return err + } + + wantBlobs := make(map[restic.ID]int) + for _, blob := range test.blobs { + wantBlobs[blob.ID] = 1 + } + + err = repository.StreamPack(ctx, load, &key, restic.ID{}, test.blobs, handleBlob) + if err != nil { + t.Fatal(err) + } + + if !cmp.Equal(wantBlobs, gotBlobs) { + t.Fatal(cmp.Diff(wantBlobs, gotBlobs)) + } + }) + } + }) + + // next, test invalid uses, which should return an error + t.Run("invalid", func(t *testing.T) { + tests := []struct { + blobs []restic.Blob + err string + }{ + { + // pass one blob several times + blobs: []restic.Blob{ + packfileBlobs[3], + packfileBlobs[8], + packfileBlobs[3], + packfileBlobs[4], + }, + err: "overlapping blobs in pack", + }, + + { + // pass something that's not a valid blob in the current pack file + blobs: []restic.Blob{ + { + Offset: 123, + Length: 20000, + }, + }, + err: "ciphertext verification failed", + }, + + { + // pass a blob that's too small + blobs: []restic.Blob{ + { + Offset: 123, + Length: 10, + }, + }, + err: "invalid blob length", + }, + } + + for _, test := range tests { + t.Run("", func(t *testing.T) { + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + + handleBlob := func(blob restic.BlobHandle, buf []byte, err error) error { + return err + } + + err = repository.StreamPack(ctx, load, &key, restic.ID{}, test.blobs, handleBlob) + if err == nil { + t.Fatalf("wanted error %v, got nil", test.err) + } + + if !strings.Contains(err.Error(), test.err) { + t.Fatalf("wrong error returned, it should contain %q but was %q", test.err, err) + } + }) + } + }) } diff --git a/internal/restic/repository.go b/internal/restic/repository.go index 38c611ce6..c2a6e9f74 100644 --- a/internal/restic/repository.go +++ b/internal/restic/repository.go @@ -60,6 +60,11 @@ type Lister interface { List(context.Context, FileType, func(FileInfo) error) error } +type PackBlobs struct { + PackID ID + Blobs []Blob +} + // MasterIndex keeps track of the blobs are stored within files. type MasterIndex interface { Has(BlobHandle) bool @@ -71,4 +76,5 @@ type MasterIndex interface { // the context is cancelled, the background goroutine terminates. This // blocks any modification of the index. Each(ctx context.Context) <-chan PackedBlob + ListPacks(ctx context.Context, packs IDSet) <-chan PackBlobs } diff --git a/internal/restorer/filerestorer.go b/internal/restorer/filerestorer.go index d3d52f13a..84c7834df 100644 --- a/internal/restorer/filerestorer.go +++ b/internal/restorer/filerestorer.go @@ -1,12 +1,8 @@ package restorer import ( - "bufio" "context" - "io" - "math" "path/filepath" - "sort" "sync" "golang.org/x/sync/errgroup" @@ -14,6 +10,7 @@ import ( "github.com/restic/restic/internal/crypto" "github.com/restic/restic/internal/debug" "github.com/restic/restic/internal/errors" + "github.com/restic/restic/internal/repository" "github.com/restic/restic/internal/restic" ) @@ -52,7 +49,7 @@ type packInfo struct { type fileRestorer struct { key *crypto.Key idx func(restic.BlobHandle) []restic.PackedBlob - packLoader func(ctx context.Context, h restic.Handle, length int, offset int64, fn func(rd io.Reader) error) error + packLoader repository.BackendLoadFn filesWriter *filesWriter @@ -62,7 +59,7 @@ type fileRestorer struct { } func newFileRestorer(dst string, - packLoader func(ctx context.Context, h restic.Handle, length int, offset int64, fn func(rd io.Reader) error) error, + packLoader repository.BackendLoadFn, key *crypto.Key, idx func(restic.BlobHandle) []restic.PackedBlob) *fileRestorer { @@ -120,7 +117,7 @@ func (r *fileRestorer) restoreFiles(ctx context.Context) error { err := r.forEachBlob(fileBlobs, func(packID restic.ID, blob restic.Blob) { if largeFile { packsMap[packID] = append(packsMap[packID], fileBlobInfo{id: blob.ID, offset: fileOffset}) - fileOffset += int64(blob.Length) - crypto.Extension + fileOffset += int64(restic.PlaintextLength(int(blob.Length))) } pack, ok := packs[packID] if !ok { @@ -175,30 +172,19 @@ func (r *fileRestorer) restoreFiles(ctx context.Context) error { return wg.Wait() } -const maxBufferSize = 4 * 1024 * 1024 - func (r *fileRestorer) downloadPack(ctx context.Context, pack *packInfo) error { - // calculate pack byte range and blob->[]files->[]offsets mappings - start, end := int64(math.MaxInt64), int64(0) + // calculate blob->[]files->[]offsets mappings blobs := make(map[restic.ID]struct { - offset int64 // offset of the blob in the pack - length int // length of the blob - files map[*fileInfo][]int64 // file -> offsets (plural!) of the blob in the file + files map[*fileInfo][]int64 // file -> offsets (plural!) of the blob in the file }) + var blobList []restic.Blob for file := range pack.files { addBlob := func(blob restic.Blob, fileOffset int64) { - if start > int64(blob.Offset) { - start = int64(blob.Offset) - } - if end < int64(blob.Offset+blob.Length) { - end = int64(blob.Offset + blob.Length) - } blobInfo, ok := blobs[blob.ID] if !ok { - blobInfo.offset = int64(blob.Offset) - blobInfo.length = int(blob.Length) blobInfo.files = make(map[*fileInfo][]int64) + blobList = append(blobList, blob) blobs[blob.ID] = blobInfo } blobInfo.files[file] = append(blobInfo.files[file], fileOffset) @@ -228,14 +214,6 @@ func (r *fileRestorer) downloadPack(ctx context.Context, pack *packInfo) error { } } - sortedBlobs := make([]restic.ID, 0, len(blobs)) - for blobID := range blobs { - sortedBlobs = append(sortedBlobs, blobID) - } - sort.Slice(sortedBlobs, func(i, j int) bool { - return blobs[sortedBlobs[i]].offset < blobs[sortedBlobs[j]].offset - }) - sanitizeError := func(file *fileInfo, err error) error { if err != nil { err = r.Error(file.location, err) @@ -243,59 +221,39 @@ func (r *fileRestorer) downloadPack(ctx context.Context, pack *packInfo) error { return err } - h := restic.Handle{Type: restic.PackFile, Name: pack.id.String(), ContainedBlobType: restic.DataBlob} - err := r.packLoader(ctx, h, int(end-start), start, func(rd io.Reader) error { - bufferSize := int(end - start) - if bufferSize > maxBufferSize { - bufferSize = maxBufferSize - } - bufRd := bufio.NewReaderSize(rd, bufferSize) - currentBlobEnd := start - var blobData, buf []byte - for _, blobID := range sortedBlobs { - blob := blobs[blobID] - _, err := bufRd.Discard(int(blob.offset - currentBlobEnd)) - if err != nil { - return err - } - buf, err = r.downloadBlob(bufRd, blobID, blob.length, buf) - if err != nil { - return err - } - blobData, err = r.decryptBlob(blobID, buf) - if err != nil { - for file := range blob.files { - if errFile := sanitizeError(file, err); errFile != nil { - return errFile - } + err := repository.StreamPack(ctx, r.packLoader, r.key, pack.id, blobList, func(h restic.BlobHandle, blobData []byte, err error) error { + blob := blobs[h.ID] + if err != nil { + for file := range blob.files { + if errFile := sanitizeError(file, err); errFile != nil { + return errFile } - continue } - currentBlobEnd = blob.offset + int64(blob.length) - for file, offsets := range blob.files { - for _, offset := range offsets { - writeToFile := func() error { - // this looks overly complicated and needs explanation - // two competing requirements: - // - must create the file once and only once - // - should allow concurrent writes to the file - // so write the first blob while holding file lock - // write other blobs after releasing the lock - createSize := int64(-1) - file.lock.Lock() - if file.inProgress { - file.lock.Unlock() - } else { - defer file.lock.Unlock() - file.inProgress = true - createSize = file.size - } - return r.filesWriter.writeToFile(r.targetPath(file.location), blobData, offset, createSize) - } - err := sanitizeError(file, writeToFile()) - if err != nil { - return err + return nil + } + for file, offsets := range blob.files { + for _, offset := range offsets { + writeToFile := func() error { + // this looks overly complicated and needs explanation + // two competing requirements: + // - must create the file once and only once + // - should allow concurrent writes to the file + // so write the first blob while holding file lock + // write other blobs after releasing the lock + createSize := int64(-1) + file.lock.Lock() + if file.inProgress { + file.lock.Unlock() + } else { + defer file.lock.Unlock() + file.inProgress = true + createSize = file.size } + return r.filesWriter.writeToFile(r.targetPath(file.location), blobData, offset, createSize) + } + err := sanitizeError(file, writeToFile()) + if err != nil { + return err } } } @@ -312,41 +270,3 @@ func (r *fileRestorer) downloadPack(ctx context.Context, pack *packInfo) error { return nil } - -func (r *fileRestorer) downloadBlob(rd io.Reader, blobID restic.ID, length int, buf []byte) ([]byte, error) { - // TODO reconcile with Repository#loadBlob implementation - - if cap(buf) < length { - buf = make([]byte, length) - } else { - buf = buf[:length] - } - - n, err := io.ReadFull(rd, buf) - if err != nil { - return nil, err - } - - if n != length { - return nil, errors.Errorf("error loading blob %v: wrong length returned, want %d, got %d", blobID.Str(), length, n) - } - return buf, nil -} - -func (r *fileRestorer) decryptBlob(blobID restic.ID, buf []byte) ([]byte, error) { - // TODO reconcile with Repository#loadBlob implementation - - // decrypt - nonce, ciphertext := buf[:r.key.NonceSize()], buf[r.key.NonceSize():] - plaintext, err := r.key.Open(ciphertext[:0], nonce, ciphertext, nil) - if err != nil { - return nil, errors.Errorf("decrypting blob %v failed: %v", blobID, err) - } - - // check hash - if !restic.Hash(plaintext).Equal(blobID) { - return nil, errors.Errorf("blob %v returned invalid hash", blobID) - } - - return plaintext, nil -} diff --git a/internal/restorer/filerestorer_test.go b/internal/restorer/filerestorer_test.go index 333420b70..f5760f54a 100644 --- a/internal/restorer/filerestorer_test.go +++ b/internal/restorer/filerestorer_test.go @@ -10,6 +10,7 @@ import ( "github.com/restic/restic/internal/crypto" "github.com/restic/restic/internal/errors" + "github.com/restic/restic/internal/repository" "github.com/restic/restic/internal/restic" rtest "github.com/restic/restic/internal/test" ) @@ -38,7 +39,7 @@ type TestRepo struct { filesPathToContent map[string]string // - loader func(ctx context.Context, h restic.Handle, length int, offset int64, fn func(rd io.Reader) error) error + loader repository.BackendLoadFn } func (i *TestRepo) Lookup(bh restic.BlobHandle) []restic.PackedBlob { @@ -267,7 +268,7 @@ func TestErrorRestoreFiles(t *testing.T) { r.files = repo.files err := r.restoreFiles(context.TODO()) - rtest.Equals(t, loadError, err) + rtest.Assert(t, errors.Is(err, loadError), "got %v, expected contained error %v", err, loadError) } func TestDownloadError(t *testing.T) {