2
2
mirror of https://github.com/octoleo/restic.git synced 2024-06-06 11:00:48 +00:00

Merge pull request #3109 from aawsome/optimize-filerestorer

restore: Don't save pack content in memory
This commit is contained in:
MichaelEischer 2021-01-03 14:53:41 +01:00 committed by GitHub
commit f7ec263a22
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
2 changed files with 122 additions and 64 deletions

View File

@ -1,11 +1,12 @@
package restorer
import (
"bytes"
"bufio"
"context"
"io"
"math"
"path/filepath"
"sort"
"sync"
"github.com/restic/restic/internal/crypto"
@ -179,6 +180,8 @@ func (r *fileRestorer) restoreFiles(ctx context.Context) error {
return nil
}
const maxBufferSize = 4 * 1024 * 1024
func (r *fileRestorer) downloadPack(ctx context.Context, pack *packInfo) {
// calculate pack byte range and blob->[]files->[]offsets mappings
@ -226,18 +229,12 @@ func (r *fileRestorer) downloadPack(ctx context.Context, pack *packInfo) {
}
}
packData := make([]byte, int(end-start))
h := restic.Handle{Type: restic.PackFile, Name: pack.id.String()}
err := r.packLoader(ctx, h, int(end-start), start, func(rd io.Reader) error {
l, err := io.ReadFull(rd, packData)
if err != nil {
return err
}
if l != len(packData) {
return errors.Errorf("unexpected pack size: expected %d but got %d", len(packData), l)
}
return nil
sortedBlobs := make([]restic.ID, 0, len(blobs))
for blobID := range blobs {
sortedBlobs = append(sortedBlobs, blobID)
}
sort.Slice(sortedBlobs, func(i, j int) bool {
return blobs[sortedBlobs[i]].offset < blobs[sortedBlobs[j]].offset
})
markFileError := func(file *fileInfo, err error) {
@ -248,6 +245,62 @@ func (r *fileRestorer) downloadPack(ctx context.Context, pack *packInfo) {
}
}
h := restic.Handle{Type: restic.PackFile, Name: pack.id.String()}
err := r.packLoader(ctx, h, int(end-start), start, func(rd io.Reader) error {
bufferSize := int(end - start)
if bufferSize > maxBufferSize {
bufferSize = maxBufferSize
}
bufRd := bufio.NewReaderSize(rd, bufferSize)
currentBlobEnd := start
var blobData, buf []byte
for _, blobID := range sortedBlobs {
blob := blobs[blobID]
_, err := bufRd.Discard(int(blob.offset - currentBlobEnd))
if err != nil {
return err
}
blobData, buf, err = r.loadBlob(bufRd, blobID, blob.length, buf)
if err != nil {
for file := range blob.files {
markFileError(file, err)
}
continue
}
currentBlobEnd = blob.offset + int64(blob.length)
for file, offsets := range blob.files {
for _, offset := range offsets {
writeToFile := func() error {
// this looks overly complicated and needs explanation
// two competing requirements:
// - must create the file once and only once
// - should allow concurrent writes to the file
// so write the first blob while holding file lock
// write other blobs after releasing the lock
file.lock.Lock()
create := file.flags&fileProgress == 0
createSize := int64(-1)
if create {
defer file.lock.Unlock()
file.flags |= fileProgress
createSize = file.size
} else {
file.lock.Unlock()
}
return r.filesWriter.writeToFile(r.targetPath(file.location), blobData, offset, createSize)
}
err := writeToFile()
if err != nil {
markFileError(file, err)
break
}
}
}
}
return nil
})
if err != nil {
for file := range pack.files {
markFileError(file, err)
@ -255,72 +308,37 @@ func (r *fileRestorer) downloadPack(ctx context.Context, pack *packInfo) {
return
}
rd := bytes.NewReader(packData)
for blobID, blob := range blobs {
blobData, err := r.loadBlob(rd, blobID, blob.offset-start, blob.length)
if err != nil {
for file := range blob.files {
markFileError(file, err)
}
continue
}
for file, offsets := range blob.files {
for _, offset := range offsets {
writeToFile := func() error {
// this looks overly complicated and needs explanation
// two competing requirements:
// - must create the file once and only once
// - should allow concurrent writes to the file
// so write the first blob while holding file lock
// write other blobs after releasing the lock
file.lock.Lock()
create := file.flags&fileProgress == 0
createSize := int64(-1)
if create {
defer file.lock.Unlock()
file.flags |= fileProgress
createSize = file.size
} else {
file.lock.Unlock()
}
return r.filesWriter.writeToFile(r.targetPath(file.location), blobData, offset, createSize)
}
err := writeToFile()
if err != nil {
markFileError(file, err)
break
}
}
}
}
}
func (r *fileRestorer) loadBlob(rd io.ReaderAt, blobID restic.ID, offset int64, length int) ([]byte, error) {
func (r *fileRestorer) loadBlob(rd io.Reader, blobID restic.ID, length int, buf []byte) ([]byte, []byte, error) {
// TODO reconcile with Repository#loadBlob implementation
buf := make([]byte, length)
if cap(buf) < length {
buf = make([]byte, length)
} else {
buf = buf[:length]
}
n, err := rd.ReadAt(buf, offset)
n, err := io.ReadFull(rd, buf)
if err != nil {
return nil, err
return nil, nil, err
}
if n != length {
return nil, errors.Errorf("error loading blob %v: wrong length returned, want %d, got %d", blobID.Str(), length, n)
return nil, nil, errors.Errorf("error loading blob %v: wrong length returned, want %d, got %d", blobID.Str(), length, n)
}
// decrypt
nonce, ciphertext := buf[:r.key.NonceSize()], buf[r.key.NonceSize():]
plaintext, err := r.key.Open(ciphertext[:0], nonce, ciphertext, nil)
if err != nil {
return nil, errors.Errorf("decrypting blob %v failed: %v", blobID, err)
return nil, nil, errors.Errorf("decrypting blob %v failed: %v", blobID, err)
}
// check hash
if !restic.Hash(plaintext).Equal(blobID) {
return nil, errors.Errorf("blob %v returned invalid hash", blobID)
return nil, nil, errors.Errorf("blob %v returned invalid hash", blobID)
}
return plaintext, nil
return plaintext, buf, nil
}

View File

@ -152,16 +152,25 @@ func newTestRepo(content []TestFile) *TestRepo {
return repo
}
func restoreAndVerify(t *testing.T, tempdir string, content []TestFile) {
func restoreAndVerify(t *testing.T, tempdir string, content []TestFile, files map[string]bool) {
repo := newTestRepo(content)
r := newFileRestorer(tempdir, repo.loader, repo.key, repo.Lookup)
r.files = repo.files
if files == nil {
r.files = repo.files
} else {
for _, file := range repo.files {
if files[file.location] {
r.files = append(r.files, file)
}
}
}
err := r.restoreFiles(context.TODO())
rtest.OK(t, err)
for _, file := range repo.files {
for _, file := range r.files {
target := r.targetPath(file.location)
data, err := ioutil.ReadFile(target)
if err != nil {
@ -203,5 +212,36 @@ func TestFileRestorerBasic(t *testing.T) {
{"data3-1", "pack3-1"},
},
},
})
}, nil)
}
func TestFileRestorerPackSkip(t *testing.T) {
tempdir, cleanup := rtest.TempDir(t)
defer cleanup()
files := make(map[string]bool)
files["file2"] = true
restoreAndVerify(t, tempdir, []TestFile{
{
name: "file1",
blobs: []TestBlob{
{"data1-1", "pack1"},
{"data1-2", "pack1"},
{"data1-3", "pack1"},
{"data1-4", "pack1"},
{"data1-5", "pack1"},
{"data1-6", "pack1"},
},
},
{
name: "file2",
blobs: []TestBlob{
// file is contained in pack1 but need pack parts to be skipped
{"data1-2", "pack1"},
{"data1-4", "pack1"},
{"data1-6", "pack1"},
},
},
}, files)
}