Merge pull request #3484 from MichaelEischer/stream-check-repack

Stream packs in `check --read-data` and during repacking
This commit is contained in:
Alexander Neumann 2022-03-26 20:46:17 +01:00 committed by GitHub
commit 4d5db61bd0
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
10 changed files with 502 additions and 433 deletions

View File

@ -2135,7 +2135,4 @@ func TestBackendLoadWriteTo(t *testing.T) {
firstSnapshot := testRunList(t, "snapshots", env.gopts)
rtest.Assert(t, len(firstSnapshot) == 1,
"expected one snapshot, got %v", firstSnapshot)
// test readData using the hashing.Reader
testRunCheck(t, env.gopts)
}

View File

@ -1,14 +1,19 @@
package checker
import (
"bufio"
"bytes"
"context"
"fmt"
"io"
"os"
"io/ioutil"
"sort"
"sync"
"github.com/minio/sha256-simd"
"github.com/restic/restic/internal/debug"
"github.com/restic/restic/internal/errors"
"github.com/restic/restic/internal/hashing"
"github.com/restic/restic/internal/pack"
"github.com/restic/restic/internal/repository"
"github.com/restic/restic/internal/restic"
@ -436,78 +441,104 @@ func (c *Checker) GetPacks() map[restic.ID]int64 {
}
// checkPack reads a pack and checks the integrity of all blobs.
func checkPack(ctx context.Context, r restic.Repository, id restic.ID, size int64) error {
debug.Log("checking pack %v", id)
h := restic.Handle{Type: restic.PackFile, Name: id.String()}
func checkPack(ctx context.Context, r restic.Repository, id restic.ID, blobs []restic.Blob, size int64, bufRd *bufio.Reader) error {
debug.Log("checking pack %v", id.String())
packfile, hash, realSize, err := repository.DownloadAndHash(ctx, r.Backend(), h)
if err != nil {
return errors.Wrap(err, "checkPack")
if len(blobs) == 0 {
return errors.Errorf("pack %v is empty or not indexed", id)
}
defer func() {
_ = packfile.Close()
_ = os.Remove(packfile.Name())
}()
// sanity check blobs in index
sort.Slice(blobs, func(i, j int) bool {
return blobs[i].Offset < blobs[j].Offset
})
idxHdrSize := pack.HeaderSize + len(blobs)*int(pack.EntrySize)
lastBlobEnd := 0
nonContinuousPack := false
for _, blob := range blobs {
if lastBlobEnd != int(blob.Offset) {
nonContinuousPack = true
}
lastBlobEnd = int(blob.Offset + blob.Length)
}
// size was calculated by masterindex.PackSize, thus there's no need to recalculate it here
debug.Log("hash for pack %v is %v", id, hash)
var errs []error
if nonContinuousPack {
debug.Log("Index for pack contains gaps / overlaps, blobs: %v", blobs)
errs = append(errs, errors.New("Index for pack contains gaps / overlapping blobs"))
}
// calculate hash on-the-fly while reading the pack and capture pack header
var hash restic.ID
var hdrBuf []byte
hashingLoader := func(ctx context.Context, h restic.Handle, length int, offset int64, fn func(rd io.Reader) error) error {
return r.Backend().Load(ctx, h, int(size), 0, func(rd io.Reader) error {
hrd := hashing.NewReader(rd, sha256.New())
bufRd.Reset(hrd)
// skip to start of first blob, offset == 0 for correct pack files
_, err := bufRd.Discard(int(offset))
if err != nil {
return err
}
err = fn(bufRd)
if err != nil {
return err
}
// skip enough bytes until we reach the possible header start
curPos := length + int(offset)
minHdrStart := int(size) - pack.MaxHeaderSize
if minHdrStart > curPos {
_, err := bufRd.Discard(minHdrStart - curPos)
if err != nil {
return err
}
}
// read remainder, which should be the pack header
hdrBuf, err = ioutil.ReadAll(bufRd)
if err != nil {
return err
}
hash = restic.IDFromHash(hrd.Sum(nil))
return nil
})
}
err := repository.StreamPack(ctx, hashingLoader, r.Key(), id, blobs, func(blob restic.BlobHandle, buf []byte, err error) error {
debug.Log(" check blob %v: %v", blob.ID, blob)
if err != nil {
debug.Log(" error verifying blob %v: %v", blob.ID, err)
errs = append(errs, errors.Errorf("blob %v: %v", blob.ID, err))
}
return nil
})
if err != nil {
// failed to load the pack file, return as further checks cannot succeed anyways
debug.Log(" error streaming pack: %v", err)
return errors.Errorf("pack %v failed to download: %v", err)
}
if !hash.Equal(id) {
debug.Log("Pack ID does not match, want %v, got %v", id, hash)
return errors.Errorf("Pack ID does not match, want %v, got %v", id.Str(), hash.Str())
}
if realSize != size {
debug.Log("Pack size does not match, want %v, got %v", size, realSize)
return errors.Errorf("Pack size does not match, want %v, got %v", size, realSize)
}
blobs, hdrSize, err := pack.List(r.Key(), packfile, size)
blobs, hdrSize, err := pack.List(r.Key(), bytes.NewReader(hdrBuf), int64(len(hdrBuf)))
if err != nil {
return err
}
var errs []error
var buf []byte
sizeFromBlobs := uint(hdrSize)
if uint32(idxHdrSize) != hdrSize {
debug.Log("Pack header size does not match, want %v, got %v", idxHdrSize, hdrSize)
errs = append(errs, errors.Errorf("Pack header size does not match, want %v, got %v", idxHdrSize, hdrSize))
}
idx := r.Index()
for i, blob := range blobs {
sizeFromBlobs += blob.Length
debug.Log(" check blob %d: %v", i, blob)
buf = buf[:cap(buf)]
if uint(len(buf)) < blob.Length {
buf = make([]byte, blob.Length)
}
buf = buf[:blob.Length]
_, err := packfile.Seek(int64(blob.Offset), 0)
if err != nil {
return errors.Errorf("Seek(%v): %v", blob.Offset, err)
}
_, err = io.ReadFull(packfile, buf)
if err != nil {
debug.Log(" error loading blob %v: %v", blob.ID, err)
errs = append(errs, errors.Errorf("blob %v: %v", i, err))
continue
}
nonce, ciphertext := buf[:r.Key().NonceSize()], buf[r.Key().NonceSize():]
plaintext, err := r.Key().Open(ciphertext[:0], nonce, ciphertext, nil)
if err != nil {
debug.Log(" error decrypting blob %v: %v", blob.ID, err)
errs = append(errs, errors.Errorf("blob %v: %v", i, err))
continue
}
hash := restic.Hash(plaintext)
if !hash.Equal(blob.ID) {
debug.Log(" Blob ID does not match, want %v, got %v", blob.ID, hash)
errs = append(errs, errors.Errorf("Blob ID does not match, want %v, got %v", blob.ID.Str(), hash.Str()))
continue
}
for _, blob := range blobs {
// Check if blob is contained in index and position is correct
idxHas := false
for _, pb := range idx.Lookup(blob.BlobHandle) {
@ -522,11 +553,6 @@ func checkPack(ctx context.Context, r restic.Repository, id restic.ID, size int6
}
}
if int64(sizeFromBlobs) != size {
debug.Log("Pack size does not match, want %v, got %v", size, sizeFromBlobs)
errs = append(errs, errors.Errorf("Pack size does not match, want %v, got %v", size, sizeFromBlobs))
}
if len(errs) > 0 {
return errors.Errorf("pack %v contains %v errors: %v", id.Str(), len(errs), errs)
}
@ -544,17 +570,21 @@ func (c *Checker) ReadPacks(ctx context.Context, packs map[restic.ID]int64, p *p
defer close(errChan)
g, ctx := errgroup.WithContext(ctx)
type packsize struct {
id restic.ID
size int64
type checkTask struct {
id restic.ID
size int64
blobs []restic.Blob
}
ch := make(chan packsize)
ch := make(chan checkTask)
// run workers
for i := 0; i < defaultParallelism; i++ {
g.Go(func() error {
// create a buffer that is large enough to be reused by repository.StreamPack
// this ensures that we can read the pack header later on
bufRd := bufio.NewReaderSize(nil, repository.MaxStreamBufferSize)
for {
var ps packsize
var ps checkTask
var ok bool
select {
@ -565,7 +595,8 @@ func (c *Checker) ReadPacks(ctx context.Context, packs map[restic.ID]int64, p *p
return nil
}
}
err := checkPack(ctx, c.repo, ps.id, ps.size)
err := checkPack(ctx, c.repo, ps.id, ps.blobs, ps.size, bufRd)
p.Add(1)
if err == nil {
continue
@ -580,10 +611,17 @@ func (c *Checker) ReadPacks(ctx context.Context, packs map[restic.ID]int64, p *p
})
}
packSet := restic.NewIDSet()
for pack := range packs {
packSet.Insert(pack)
}
// push packs to ch
for pack, size := range packs {
for pbs := range c.repo.Index().ListPacks(ctx, packSet) {
size := packs[pbs.PackID]
debug.Log("listed %v", pbs.PackID)
select {
case ch <- packsize{id: pack, size: size}:
case ch <- checkTask{id: pbs.PackID, size: size, blobs: pbs.Blobs}:
case <-ctx.Done():
}
}

View File

@ -160,7 +160,8 @@ const (
// HeaderSize is the header's constant overhead (independent of #entries)
HeaderSize = headerLengthSize + crypto.Extension
maxHeaderSize = 16 * 1024 * 1024
// MaxHeaderSize is the max size of header including header-length field
MaxHeaderSize = 16*1024*1024 + headerLengthSize
// number of header enries to download as part of header-length request
eagerEntries = 15
)
@ -199,7 +200,7 @@ func readRecords(rd io.ReaderAt, size int64, max int) ([]byte, int, error) {
err = InvalidFileError{Message: "header length is invalid"}
case int64(hlen) > size-int64(headerLengthSize):
err = InvalidFileError{Message: "header is larger than file"}
case int64(hlen) > maxHeaderSize:
case int64(hlen) > MaxHeaderSize-int64(headerLengthSize):
err = InvalidFileError{Message: "header is larger than maxHeaderSize"}
}
if err != nil {

View File

@ -400,3 +400,42 @@ func (mi *MasterIndex) Save(ctx context.Context, repo restic.Repository, packBla
return obsolete, err
}
// ListPacks returns the blobs of the specified pack files grouped by pack file.
func (mi *MasterIndex) ListPacks(ctx context.Context, packs restic.IDSet) <-chan restic.PackBlobs {
out := make(chan restic.PackBlobs)
go func() {
defer close(out)
// only resort a part of the index to keep the memory overhead bounded
for i := byte(0); i < 16; i++ {
if ctx.Err() != nil {
return
}
packBlob := make(map[restic.ID][]restic.Blob)
for pack := range packs {
if pack[0]&0xf == i {
packBlob[pack] = nil
}
}
if len(packBlob) == 0 {
continue
}
for pb := range mi.Each(ctx) {
if packs.Has(pb.PackID) && pb.PackID[0]&0xf == i {
packBlob[pb.PackID] = append(packBlob[pb.PackID], pb.Blob)
}
}
// pass on packs
for packID, pbs := range packBlob {
select {
case out <- restic.PackBlobs{PackID: packID, Blobs: pbs}:
case <-ctx.Done():
return
}
}
}
}()
return out
}

View File

@ -2,13 +2,9 @@ package repository
import (
"context"
"os"
"sync"
"github.com/restic/restic/internal/debug"
"github.com/restic/restic/internal/errors"
"github.com/restic/restic/internal/fs"
"github.com/restic/restic/internal/pack"
"github.com/restic/restic/internal/restic"
"github.com/restic/restic/internal/ui/progress"
@ -27,147 +23,63 @@ const numRepackWorkers = 8
func Repack(ctx context.Context, repo restic.Repository, packs restic.IDSet, keepBlobs restic.BlobSet, p *progress.Counter) (obsoletePacks restic.IDSet, err error) {
debug.Log("repacking %d packs while keeping %d blobs", len(packs), len(keepBlobs))
var keepMutex sync.Mutex
wg, wgCtx := errgroup.WithContext(ctx)
downloadQueue := make(chan restic.ID)
downloadQueue := make(chan restic.PackBlobs)
wg.Go(func() error {
defer close(downloadQueue)
for packID := range packs {
select {
case downloadQueue <- packID:
case <-wgCtx.Done():
return wgCtx.Err()
}
}
return nil
})
type repackJob struct {
tempfile *os.File
hash restic.ID
packLength int64
}
processQueue := make(chan repackJob)
// used to close processQueue once all downloaders have finished
var downloadWG sync.WaitGroup
downloader := func() error {
defer downloadWG.Done()
for packID := range downloadQueue {
// load the complete pack into a temp file
h := restic.Handle{Type: restic.PackFile, Name: packID.String()}
tempfile, hash, packLength, err := DownloadAndHash(wgCtx, repo.Backend(), h)
if err != nil {
return errors.Wrap(err, "Repack")
}
debug.Log("pack %v loaded (%d bytes), hash %v", packID, packLength, hash)
if !packID.Equal(hash) {
return errors.Errorf("hash does not match id: want %v, got %v", packID, hash)
}
select {
case processQueue <- repackJob{tempfile, hash, packLength}:
case <-wgCtx.Done():
return wgCtx.Err()
}
}
return nil
}
downloadWG.Add(numRepackWorkers)
for i := 0; i < numRepackWorkers; i++ {
wg.Go(downloader)
}
wg.Go(func() error {
downloadWG.Wait()
close(processQueue)
return nil
})
var keepMutex sync.Mutex
worker := func() error {
for job := range processQueue {
tempfile, packID, packLength := job.tempfile, job.hash, job.packLength
blobs, _, err := pack.List(repo.Key(), tempfile, packLength)
if err != nil {
return err
}
debug.Log("processing pack %v, blobs: %v", packID, len(blobs))
var buf []byte
for _, entry := range blobs {
for pbs := range repo.Index().ListPacks(ctx, packs) {
var packBlobs []restic.Blob
keepMutex.Lock()
// filter out unnecessary blobs
for _, entry := range pbs.Blobs {
h := restic.BlobHandle{ID: entry.ID, Type: entry.Type}
keepMutex.Lock()
shouldKeep := keepBlobs.Has(h)
keepMutex.Unlock()
if !shouldKeep {
continue
if keepBlobs.Has(h) {
packBlobs = append(packBlobs, entry)
}
}
keepMutex.Unlock()
debug.Log(" process blob %v", h)
select {
case downloadQueue <- restic.PackBlobs{PackID: pbs.PackID, Blobs: packBlobs}:
case <-wgCtx.Done():
return wgCtx.Err()
}
}
return nil
})
if uint(cap(buf)) < entry.Length {
buf = make([]byte, entry.Length)
}
buf = buf[:entry.Length]
n, err := tempfile.ReadAt(buf, int64(entry.Offset))
if err != nil {
return errors.Wrap(err, "ReadAt")
}
if n != len(buf) {
return errors.Errorf("read blob %v from %v: not enough bytes read, want %v, got %v",
h, tempfile.Name(), len(buf), n)
}
nonce, ciphertext := buf[:repo.Key().NonceSize()], buf[repo.Key().NonceSize():]
plaintext, err := repo.Key().Open(ciphertext[:0], nonce, ciphertext, nil)
worker := func() error {
for t := range downloadQueue {
err := StreamPack(wgCtx, repo.Backend().Load, repo.Key(), t.PackID, t.Blobs, func(blob restic.BlobHandle, buf []byte, err error) error {
if err != nil {
return err
}
id := restic.Hash(plaintext)
if !id.Equal(entry.ID) {
debug.Log("read blob %v/%v from %v: wrong data returned, hash is %v",
h.Type, h.ID, tempfile.Name(), id)
return errors.Errorf("read blob %v from %v: wrong data returned, hash is %v",
h, tempfile.Name(), id)
}
keepMutex.Lock()
// recheck whether some other worker was faster
shouldKeep = keepBlobs.Has(h)
shouldKeep := keepBlobs.Has(blob)
if shouldKeep {
keepBlobs.Delete(h)
keepBlobs.Delete(blob)
}
keepMutex.Unlock()
if !shouldKeep {
continue
return nil
}
// We do want to save already saved blobs!
_, _, err = repo.SaveBlob(wgCtx, entry.Type, plaintext, entry.ID, true)
_, _, err = repo.SaveBlob(wgCtx, blob.Type, buf, blob.ID, true)
if err != nil {
return err
}
debug.Log(" saved blob %v", entry.ID)
}
if err = tempfile.Close(); err != nil {
return errors.Wrap(err, "Close")
}
if err = fs.RemoveIfExists(tempfile.Name()); err != nil {
return errors.Wrap(err, "Remove")
debug.Log(" saved blob %v", blob.ID)
return nil
})
if err != nil {
return err
}
p.Add(1)
}

View File

@ -1,30 +1,32 @@
package repository
import (
"bufio"
"bytes"
"context"
"encoding/json"
"fmt"
"io"
"os"
"sort"
"sync"
"github.com/cenkalti/backoff/v4"
"github.com/restic/chunker"
"github.com/restic/restic/internal/backend/dryrun"
"github.com/restic/restic/internal/cache"
"github.com/restic/restic/internal/crypto"
"github.com/restic/restic/internal/debug"
"github.com/restic/restic/internal/errors"
"github.com/restic/restic/internal/fs"
"github.com/restic/restic/internal/hashing"
"github.com/restic/restic/internal/pack"
"github.com/restic/restic/internal/restic"
"github.com/restic/restic/internal/ui/progress"
"github.com/minio/sha256-simd"
"golang.org/x/sync/errgroup"
)
const MaxStreamBufferSize = 4 * 1024 * 1024
// Repository is used to access a repository in a backend.
type Repository struct {
be restic.Backend
@ -742,43 +744,99 @@ type Loader interface {
Load(ctx context.Context, h restic.Handle, length int, offset int64, fn func(rd io.Reader) error) error
}
// DownloadAndHash is all-in-one helper to download content of the file at h to a temporary filesystem location
// and calculate ID of the contents. Returned (temporary) file is positioned at the beginning of the file;
// it is the reponsibility of the caller to close and delete the file.
func DownloadAndHash(ctx context.Context, be Loader, h restic.Handle) (tmpfile *os.File, hash restic.ID, size int64, err error) {
tmpfile, err = fs.TempFile("", "restic-temp-")
if err != nil {
return nil, restic.ID{}, -1, errors.Wrap(err, "TempFile")
type BackendLoadFn func(ctx context.Context, h restic.Handle, length int, offset int64, fn func(rd io.Reader) error) error
// StreamPack loads the listed blobs from the specified pack file. The plaintext blob is passed to
// the handleBlobFn callback or an error if decryption failed or the blob hash does not match. In
// case of download errors handleBlobFn might be called multiple times for the same blob. If the
// callback returns an error, then StreamPack will abort and not retry it.
func StreamPack(ctx context.Context, beLoad BackendLoadFn, key *crypto.Key, packID restic.ID, blobs []restic.Blob, handleBlobFn func(blob restic.BlobHandle, buf []byte, err error) error) error {
if len(blobs) == 0 {
// nothing to do
return nil
}
err = be.Load(ctx, h, 0, 0, func(rd io.Reader) (ierr error) {
_, ierr = tmpfile.Seek(0, io.SeekStart)
if ierr == nil {
ierr = tmpfile.Truncate(0)
}
if ierr != nil {
return ierr
}
hrd := hashing.NewReader(rd, sha256.New())
size, ierr = io.Copy(tmpfile, hrd)
hash = restic.IDFromHash(hrd.Sum(nil))
return ierr
sort.Slice(blobs, func(i, j int) bool {
return blobs[i].Offset < blobs[j].Offset
})
h := restic.Handle{Type: restic.PackFile, Name: packID.String(), ContainedBlobType: restic.DataBlob}
if err != nil {
// ignore subsequent errors
_ = tmpfile.Close()
_ = os.Remove(tmpfile.Name())
return nil, restic.ID{}, -1, errors.Wrap(err, "Load")
}
dataStart := blobs[0].Offset
dataEnd := blobs[len(blobs)-1].Offset + blobs[len(blobs)-1].Length
_, err = tmpfile.Seek(0, io.SeekStart)
if err != nil {
// ignore subsequent errors
_ = tmpfile.Close()
_ = os.Remove(tmpfile.Name())
return nil, restic.ID{}, -1, errors.Wrap(err, "Seek")
}
debug.Log("streaming pack %v (%d to %d bytes), blobs: %v", packID, dataStart, dataEnd, len(blobs))
return tmpfile, hash, size, err
ctx, cancel := context.WithCancel(ctx)
// stream blobs in pack
err := beLoad(ctx, h, int(dataEnd-dataStart), int64(dataStart), func(rd io.Reader) error {
// prevent callbacks after cancelation
if ctx.Err() != nil {
return ctx.Err()
}
bufferSize := int(dataEnd - dataStart)
if bufferSize > MaxStreamBufferSize {
bufferSize = MaxStreamBufferSize
}
// create reader here to allow reusing the buffered reader from checker.checkData
bufRd := bufio.NewReaderSize(rd, bufferSize)
currentBlobEnd := dataStart
var buf []byte
for _, entry := range blobs {
skipBytes := int(entry.Offset - currentBlobEnd)
if skipBytes < 0 {
return errors.Errorf("overlapping blobs in pack %v", packID)
}
_, err := bufRd.Discard(skipBytes)
if err != nil {
return err
}
h := restic.BlobHandle{ID: entry.ID, Type: entry.Type}
debug.Log(" process blob %v, skipped %d, %v", h, skipBytes, entry)
if uint(cap(buf)) < entry.Length {
buf = make([]byte, entry.Length)
}
buf = buf[:entry.Length]
n, err := io.ReadFull(bufRd, buf)
if err != nil {
debug.Log(" read error %v", err)
return errors.Wrap(err, "ReadFull")
}
if n != len(buf) {
return errors.Errorf("read blob %v from %v: not enough bytes read, want %v, got %v",
h, packID.Str(), len(buf), n)
}
currentBlobEnd = entry.Offset + entry.Length
if int(entry.Length) <= key.NonceSize() {
debug.Log("%v", blobs)
return errors.Errorf("invalid blob length %v", entry)
}
// decryption errors are likely permanent, give the caller a chance to skip them
nonce, ciphertext := buf[:key.NonceSize()], buf[key.NonceSize():]
plaintext, err := key.Open(ciphertext[:0], nonce, ciphertext, nil)
if err == nil {
id := restic.Hash(plaintext)
if !id.Equal(entry.ID) {
debug.Log("read blob %v/%v from %v: wrong data returned, hash is %v",
h.Type, h.ID, packID.Str(), id)
err = errors.Errorf("read blob %v from %v: wrong data returned, hash is %v",
h, packID.Str(), id)
}
}
err = handleBlobFn(entry.BlobHandle, plaintext, err)
if err != nil {
cancel()
return backoff.Permanent(err)
}
}
return nil
})
return errors.Wrap(err, "StreamPack")
}

View File

@ -4,19 +4,22 @@ import (
"bytes"
"context"
"crypto/sha256"
"encoding/json"
"fmt"
"io"
"math/rand"
"os"
"path/filepath"
"strings"
"testing"
"time"
"github.com/google/go-cmp/cmp"
"github.com/restic/restic/internal/archiver"
"github.com/restic/restic/internal/errors"
"github.com/restic/restic/internal/fs"
"github.com/restic/restic/internal/crypto"
"github.com/restic/restic/internal/repository"
"github.com/restic/restic/internal/restic"
"github.com/restic/restic/internal/test"
rtest "github.com/restic/restic/internal/test"
)
@ -410,108 +413,202 @@ func TestRepositoryIncrementalIndex(t *testing.T) {
t.Errorf("pack %v listed in %d indexes\n", packID, len(ids))
}
}
}
type backend struct {
rd io.Reader
}
// buildPackfileWithoutHeader returns a manually built pack file without a header.
func buildPackfileWithoutHeader(t testing.TB, blobSizes []int, key *crypto.Key) (blobs []restic.Blob, packfile []byte) {
var offset uint
for i, size := range blobSizes {
plaintext := test.Random(800+i, size)
func (be backend) Load(ctx context.Context, h restic.Handle, length int, offset int64, fn func(rd io.Reader) error) error {
return fn(be.rd)
}
// we use a deterministic nonce here so the whole process is
// deterministic, last byte is the blob index
var nonce = []byte{
0x15, 0x98, 0xc0, 0xf7, 0xb9, 0x65, 0x97, 0x74,
0x12, 0xdc, 0xd3, 0x62, 0xa9, 0x6e, 0x20, byte(i),
}
type retryBackend struct {
buf []byte
}
before := len(packfile)
packfile = append(packfile, nonce...)
packfile = key.Seal(packfile, nonce, plaintext, nil)
after := len(packfile)
func (be retryBackend) Load(ctx context.Context, h restic.Handle, length int, offset int64, fn func(rd io.Reader) error) error {
err := fn(bytes.NewReader(be.buf[:len(be.buf)/2]))
if err != nil {
return err
ciphertextLength := after - before
blobs = append(blobs, restic.Blob{
BlobHandle: restic.BlobHandle{
ID: restic.Hash(plaintext),
Type: restic.DataBlob,
},
Length: uint(ciphertextLength),
Offset: offset,
})
offset = uint(len(packfile))
}
return fn(bytes.NewReader(be.buf))
return blobs, packfile
}
func TestDownloadAndHash(t *testing.T) {
buf := make([]byte, 5*1024*1024+881)
_, err := io.ReadFull(rnd, buf)
func TestStreamPack(t *testing.T) {
// always use the same key for deterministic output
const jsonKey = `{"mac":{"k":"eQenuI8adktfzZMuC8rwdA==","r":"k8cfAly2qQSky48CQK7SBA=="},"encrypt":"MKO9gZnRiQFl8mDUurSDa9NMjiu9MUifUrODTHS05wo="}`
var key crypto.Key
err := json.Unmarshal([]byte(jsonKey), &key)
if err != nil {
t.Fatal(err)
}
var tests = []struct {
be repository.Loader
want []byte
}{
{
be: backend{rd: bytes.NewReader(buf)},
want: buf,
},
{
be: retryBackend{buf: buf},
want: buf,
},
blobSizes := []int{
10,
5231,
18812,
123123,
12301,
892242,
28616,
13351,
252287,
188883,
2522811,
18883,
}
for _, test := range tests {
t.Run("", func(t *testing.T) {
f, id, size, err := repository.DownloadAndHash(context.TODO(), test.be, restic.Handle{})
if err != nil {
t.Error(err)
}
packfileBlobs, packfile := buildPackfileWithoutHeader(t, blobSizes, &key)
want := restic.Hash(test.want)
if !want.Equal(id) {
t.Errorf("wrong hash returned, want %v, got %v", want.Str(), id.Str())
}
load := func(ctx context.Context, h restic.Handle, length int, offset int64, fn func(rd io.Reader) error) error {
data := packfile
if size != int64(len(test.want)) {
t.Errorf("wrong size returned, want %v, got %v", test.want, size)
}
if offset > int64(len(data)) {
offset = 0
length = 0
}
data = data[offset:]
err = f.Close()
if err != nil {
t.Error(err)
}
if length > len(data) {
length = len(data)
}
err = fs.RemoveIfExists(f.Name())
if err != nil {
t.Fatal(err)
}
})
}
}
data = data[:length]
type errorReader struct {
err error
}
return fn(bytes.NewReader(data))
func (er errorReader) Read(p []byte) (n int, err error) {
return 0, er.err
}
func TestDownloadAndHashErrors(t *testing.T) {
var tests = []struct {
be repository.Loader
err string
}{
{
be: backend{rd: errorReader{errors.New("test error 1")}},
err: "test error 1",
},
}
for _, test := range tests {
t.Run("", func(t *testing.T) {
_, _, _, err := repository.DownloadAndHash(context.TODO(), test.be, restic.Handle{})
if err == nil {
t.Fatalf("wanted error %q, got nil", test.err)
}
// first, test regular usage
t.Run("regular", func(t *testing.T) {
tests := []struct {
blobs []restic.Blob
}{
{packfileBlobs[1:2]},
{packfileBlobs[2:5]},
{packfileBlobs[2:8]},
{[]restic.Blob{
packfileBlobs[0],
packfileBlobs[8],
packfileBlobs[4],
}},
{[]restic.Blob{
packfileBlobs[0],
packfileBlobs[len(packfileBlobs)-1],
}},
}
if errors.Cause(err).Error() != test.err {
t.Fatalf("wanted error %q, got %q", test.err, err)
}
})
}
for _, test := range tests {
t.Run("", func(t *testing.T) {
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
gotBlobs := make(map[restic.ID]int)
handleBlob := func(blob restic.BlobHandle, buf []byte, err error) error {
gotBlobs[blob.ID]++
id := restic.Hash(buf)
if !id.Equal(blob.ID) {
t.Fatalf("wrong id %v for blob %s returned", id, blob.ID)
}
return err
}
wantBlobs := make(map[restic.ID]int)
for _, blob := range test.blobs {
wantBlobs[blob.ID] = 1
}
err = repository.StreamPack(ctx, load, &key, restic.ID{}, test.blobs, handleBlob)
if err != nil {
t.Fatal(err)
}
if !cmp.Equal(wantBlobs, gotBlobs) {
t.Fatal(cmp.Diff(wantBlobs, gotBlobs))
}
})
}
})
// next, test invalid uses, which should return an error
t.Run("invalid", func(t *testing.T) {
tests := []struct {
blobs []restic.Blob
err string
}{
{
// pass one blob several times
blobs: []restic.Blob{
packfileBlobs[3],
packfileBlobs[8],
packfileBlobs[3],
packfileBlobs[4],
},
err: "overlapping blobs in pack",
},
{
// pass something that's not a valid blob in the current pack file
blobs: []restic.Blob{
{
Offset: 123,
Length: 20000,
},
},
err: "ciphertext verification failed",
},
{
// pass a blob that's too small
blobs: []restic.Blob{
{
Offset: 123,
Length: 10,
},
},
err: "invalid blob length",
},
}
for _, test := range tests {
t.Run("", func(t *testing.T) {
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
handleBlob := func(blob restic.BlobHandle, buf []byte, err error) error {
return err
}
err = repository.StreamPack(ctx, load, &key, restic.ID{}, test.blobs, handleBlob)
if err == nil {
t.Fatalf("wanted error %v, got nil", test.err)
}
if !strings.Contains(err.Error(), test.err) {
t.Fatalf("wrong error returned, it should contain %q but was %q", test.err, err)
}
})
}
})
}

View File

@ -60,6 +60,11 @@ type Lister interface {
List(context.Context, FileType, func(FileInfo) error) error
}
type PackBlobs struct {
PackID ID
Blobs []Blob
}
// MasterIndex keeps track of the blobs are stored within files.
type MasterIndex interface {
Has(BlobHandle) bool
@ -71,4 +76,5 @@ type MasterIndex interface {
// the context is cancelled, the background goroutine terminates. This
// blocks any modification of the index.
Each(ctx context.Context) <-chan PackedBlob
ListPacks(ctx context.Context, packs IDSet) <-chan PackBlobs
}

View File

@ -1,12 +1,8 @@
package restorer
import (
"bufio"
"context"
"io"
"math"
"path/filepath"
"sort"
"sync"
"golang.org/x/sync/errgroup"
@ -14,6 +10,7 @@ import (
"github.com/restic/restic/internal/crypto"
"github.com/restic/restic/internal/debug"
"github.com/restic/restic/internal/errors"
"github.com/restic/restic/internal/repository"
"github.com/restic/restic/internal/restic"
)
@ -52,7 +49,7 @@ type packInfo struct {
type fileRestorer struct {
key *crypto.Key
idx func(restic.BlobHandle) []restic.PackedBlob
packLoader func(ctx context.Context, h restic.Handle, length int, offset int64, fn func(rd io.Reader) error) error
packLoader repository.BackendLoadFn
filesWriter *filesWriter
@ -62,7 +59,7 @@ type fileRestorer struct {
}
func newFileRestorer(dst string,
packLoader func(ctx context.Context, h restic.Handle, length int, offset int64, fn func(rd io.Reader) error) error,
packLoader repository.BackendLoadFn,
key *crypto.Key,
idx func(restic.BlobHandle) []restic.PackedBlob) *fileRestorer {
@ -120,7 +117,7 @@ func (r *fileRestorer) restoreFiles(ctx context.Context) error {
err := r.forEachBlob(fileBlobs, func(packID restic.ID, blob restic.Blob) {
if largeFile {
packsMap[packID] = append(packsMap[packID], fileBlobInfo{id: blob.ID, offset: fileOffset})
fileOffset += int64(blob.Length) - crypto.Extension
fileOffset += int64(restic.PlaintextLength(int(blob.Length)))
}
pack, ok := packs[packID]
if !ok {
@ -175,30 +172,19 @@ func (r *fileRestorer) restoreFiles(ctx context.Context) error {
return wg.Wait()
}
const maxBufferSize = 4 * 1024 * 1024
func (r *fileRestorer) downloadPack(ctx context.Context, pack *packInfo) error {
// calculate pack byte range and blob->[]files->[]offsets mappings
start, end := int64(math.MaxInt64), int64(0)
// calculate blob->[]files->[]offsets mappings
blobs := make(map[restic.ID]struct {
offset int64 // offset of the blob in the pack
length int // length of the blob
files map[*fileInfo][]int64 // file -> offsets (plural!) of the blob in the file
files map[*fileInfo][]int64 // file -> offsets (plural!) of the blob in the file
})
var blobList []restic.Blob
for file := range pack.files {
addBlob := func(blob restic.Blob, fileOffset int64) {
if start > int64(blob.Offset) {
start = int64(blob.Offset)
}
if end < int64(blob.Offset+blob.Length) {
end = int64(blob.Offset + blob.Length)
}
blobInfo, ok := blobs[blob.ID]
if !ok {
blobInfo.offset = int64(blob.Offset)
blobInfo.length = int(blob.Length)
blobInfo.files = make(map[*fileInfo][]int64)
blobList = append(blobList, blob)
blobs[blob.ID] = blobInfo
}
blobInfo.files[file] = append(blobInfo.files[file], fileOffset)
@ -228,14 +214,6 @@ func (r *fileRestorer) downloadPack(ctx context.Context, pack *packInfo) error {
}
}
sortedBlobs := make([]restic.ID, 0, len(blobs))
for blobID := range blobs {
sortedBlobs = append(sortedBlobs, blobID)
}
sort.Slice(sortedBlobs, func(i, j int) bool {
return blobs[sortedBlobs[i]].offset < blobs[sortedBlobs[j]].offset
})
sanitizeError := func(file *fileInfo, err error) error {
if err != nil {
err = r.Error(file.location, err)
@ -243,59 +221,39 @@ func (r *fileRestorer) downloadPack(ctx context.Context, pack *packInfo) error {
return err
}
h := restic.Handle{Type: restic.PackFile, Name: pack.id.String(), ContainedBlobType: restic.DataBlob}
err := r.packLoader(ctx, h, int(end-start), start, func(rd io.Reader) error {
bufferSize := int(end - start)
if bufferSize > maxBufferSize {
bufferSize = maxBufferSize
}
bufRd := bufio.NewReaderSize(rd, bufferSize)
currentBlobEnd := start
var blobData, buf []byte
for _, blobID := range sortedBlobs {
blob := blobs[blobID]
_, err := bufRd.Discard(int(blob.offset - currentBlobEnd))
if err != nil {
return err
}
buf, err = r.downloadBlob(bufRd, blobID, blob.length, buf)
if err != nil {
return err
}
blobData, err = r.decryptBlob(blobID, buf)
if err != nil {
for file := range blob.files {
if errFile := sanitizeError(file, err); errFile != nil {
return errFile
}
err := repository.StreamPack(ctx, r.packLoader, r.key, pack.id, blobList, func(h restic.BlobHandle, blobData []byte, err error) error {
blob := blobs[h.ID]
if err != nil {
for file := range blob.files {
if errFile := sanitizeError(file, err); errFile != nil {
return errFile
}
continue
}
currentBlobEnd = blob.offset + int64(blob.length)
for file, offsets := range blob.files {
for _, offset := range offsets {
writeToFile := func() error {
// this looks overly complicated and needs explanation
// two competing requirements:
// - must create the file once and only once
// - should allow concurrent writes to the file
// so write the first blob while holding file lock
// write other blobs after releasing the lock
createSize := int64(-1)
file.lock.Lock()
if file.inProgress {
file.lock.Unlock()
} else {
defer file.lock.Unlock()
file.inProgress = true
createSize = file.size
}
return r.filesWriter.writeToFile(r.targetPath(file.location), blobData, offset, createSize)
}
err := sanitizeError(file, writeToFile())
if err != nil {
return err
return nil
}
for file, offsets := range blob.files {
for _, offset := range offsets {
writeToFile := func() error {
// this looks overly complicated and needs explanation
// two competing requirements:
// - must create the file once and only once
// - should allow concurrent writes to the file
// so write the first blob while holding file lock
// write other blobs after releasing the lock
createSize := int64(-1)
file.lock.Lock()
if file.inProgress {
file.lock.Unlock()
} else {
defer file.lock.Unlock()
file.inProgress = true
createSize = file.size
}
return r.filesWriter.writeToFile(r.targetPath(file.location), blobData, offset, createSize)
}
err := sanitizeError(file, writeToFile())
if err != nil {
return err
}
}
}
@ -312,41 +270,3 @@ func (r *fileRestorer) downloadPack(ctx context.Context, pack *packInfo) error {
return nil
}
func (r *fileRestorer) downloadBlob(rd io.Reader, blobID restic.ID, length int, buf []byte) ([]byte, error) {
// TODO reconcile with Repository#loadBlob implementation
if cap(buf) < length {
buf = make([]byte, length)
} else {
buf = buf[:length]
}
n, err := io.ReadFull(rd, buf)
if err != nil {
return nil, err
}
if n != length {
return nil, errors.Errorf("error loading blob %v: wrong length returned, want %d, got %d", blobID.Str(), length, n)
}
return buf, nil
}
func (r *fileRestorer) decryptBlob(blobID restic.ID, buf []byte) ([]byte, error) {
// TODO reconcile with Repository#loadBlob implementation
// decrypt
nonce, ciphertext := buf[:r.key.NonceSize()], buf[r.key.NonceSize():]
plaintext, err := r.key.Open(ciphertext[:0], nonce, ciphertext, nil)
if err != nil {
return nil, errors.Errorf("decrypting blob %v failed: %v", blobID, err)
}
// check hash
if !restic.Hash(plaintext).Equal(blobID) {
return nil, errors.Errorf("blob %v returned invalid hash", blobID)
}
return plaintext, nil
}

View File

@ -10,6 +10,7 @@ import (
"github.com/restic/restic/internal/crypto"
"github.com/restic/restic/internal/errors"
"github.com/restic/restic/internal/repository"
"github.com/restic/restic/internal/restic"
rtest "github.com/restic/restic/internal/test"
)
@ -38,7 +39,7 @@ type TestRepo struct {
filesPathToContent map[string]string
//
loader func(ctx context.Context, h restic.Handle, length int, offset int64, fn func(rd io.Reader) error) error
loader repository.BackendLoadFn
}
func (i *TestRepo) Lookup(bh restic.BlobHandle) []restic.PackedBlob {
@ -267,7 +268,7 @@ func TestErrorRestoreFiles(t *testing.T) {
r.files = repo.files
err := r.restoreFiles(context.TODO())
rtest.Equals(t, loadError, err)
rtest.Assert(t, errors.Is(err, loadError), "got %v, expected contained error %v", err, loadError)
}
func TestDownloadError(t *testing.T) {