2
2
mirror of https://github.com/octoleo/restic.git synced 2024-11-22 21:05:10 +00:00

Merge pull request #2195 from ifedorenko/out-of-order-restore-no-progress

restorer: allow writing target file blobs out of order
This commit is contained in:
rawtaz 2020-02-26 20:07:51 +01:00 committed by GitHub
commit b5c7778428
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
28 changed files with 968 additions and 1448 deletions

View File

@ -0,0 +1,17 @@
Enhancement: Simplify and improve restore performance
Significantly improves restore performance of large files (i.e. 50M+):
https://github.com/restic/restic/issues/2074
https://forum.restic.net/t/restore-using-rclone-gdrive-backend-is-slow/1112/8
https://forum.restic.net/t/degraded-restore-performance-s3-backend/1400
Fixes "not enough cache capacity" error during restore:
https://github.com/restic/restic/issues/2244
NOTE: This new implementation does not guarantee order in which blobs
are written to the target files and, for example, the last blob of a
file can be written to the file before any of the preceeding file blobs.
It is therefore possible to have gaps in the data written to the target
files if restore fails or interrupted by the user.
https://github.com/restic/restic/pull/2195

1
go.mod
View File

@ -6,6 +6,7 @@ require (
github.com/Azure/azure-sdk-for-go v27.3.0+incompatible github.com/Azure/azure-sdk-for-go v27.3.0+incompatible
github.com/Azure/go-autorest/autorest v0.9.2 // indirect github.com/Azure/go-autorest/autorest v0.9.2 // indirect
github.com/cenkalti/backoff v2.1.1+incompatible github.com/cenkalti/backoff v2.1.1+incompatible
github.com/cespare/xxhash v1.1.0
github.com/cpuguy83/go-md2man v1.0.10 // indirect github.com/cpuguy83/go-md2man v1.0.10 // indirect
github.com/dnaeon/go-vcr v1.0.1 // indirect github.com/dnaeon/go-vcr v1.0.1 // indirect
github.com/elithrar/simple-scrypt v1.3.0 github.com/elithrar/simple-scrypt v1.3.0

6
go.sum
View File

@ -20,6 +20,8 @@ github.com/Azure/go-autorest/logger v0.1.0/go.mod h1:oExouG+K6PryycPJfVSxi/koC6L
github.com/Azure/go-autorest/tracing v0.5.0 h1:TRn4WjSnkcSy5AEG3pnbtFSwNtwzjr4VYyQflFE619k= github.com/Azure/go-autorest/tracing v0.5.0 h1:TRn4WjSnkcSy5AEG3pnbtFSwNtwzjr4VYyQflFE619k=
github.com/Azure/go-autorest/tracing v0.5.0/go.mod h1:r/s2XiOKccPW3HrqB+W0TQzfbtp2fGCgRFtBroKn4Dk= github.com/Azure/go-autorest/tracing v0.5.0/go.mod h1:r/s2XiOKccPW3HrqB+W0TQzfbtp2fGCgRFtBroKn4Dk=
github.com/BurntSushi/toml v0.3.1/go.mod h1:xHWCNGjB5oqiDr8zfno3MHue2Ht5sIBksp03qcyfWMU= github.com/BurntSushi/toml v0.3.1/go.mod h1:xHWCNGjB5oqiDr8zfno3MHue2Ht5sIBksp03qcyfWMU=
github.com/OneOfOne/xxhash v1.2.2 h1:KMrpdQIwFcEqXDklaen+P1axHaj9BSKzvpUUfnHldSE=
github.com/OneOfOne/xxhash v1.2.2/go.mod h1:HSdplMjZKSmBqAxg5vPj2TmRDmfkzw+cTzAElWljhcU=
github.com/Shopify/sarama v1.19.0/go.mod h1:FVkBWblsNy7DGZRfXLU0O9RCGt5g3g3yEuWXgklEdEo= github.com/Shopify/sarama v1.19.0/go.mod h1:FVkBWblsNy7DGZRfXLU0O9RCGt5g3g3yEuWXgklEdEo=
github.com/Shopify/toxiproxy v2.1.4+incompatible/go.mod h1:OXgGpZ6Cli1/URJOF1DMxUHB2q5Ap20/P/eIdh4G0pI= github.com/Shopify/toxiproxy v2.1.4+incompatible/go.mod h1:OXgGpZ6Cli1/URJOF1DMxUHB2q5Ap20/P/eIdh4G0pI=
github.com/alecthomas/template v0.0.0-20160405071501-a0175ee3bccc/go.mod h1:LOuyumcjzFXgccqObfd/Ljyb9UuFJ6TxHnclSeseNhc= github.com/alecthomas/template v0.0.0-20160405071501-a0175ee3bccc/go.mod h1:LOuyumcjzFXgccqObfd/Ljyb9UuFJ6TxHnclSeseNhc=
@ -28,6 +30,8 @@ github.com/apache/thrift v0.12.0/go.mod h1:cp2SuWMxlEZw2r+iP2GNCdIi4C1qmUzdZFSVb
github.com/beorn7/perks v0.0.0-20180321164747-3a771d992973/go.mod h1:Dwedo/Wpr24TaqPxmxbtue+5NUziq4I4S80YR8gNf3Q= github.com/beorn7/perks v0.0.0-20180321164747-3a771d992973/go.mod h1:Dwedo/Wpr24TaqPxmxbtue+5NUziq4I4S80YR8gNf3Q=
github.com/cenkalti/backoff v2.1.1+incompatible h1:tKJnvO2kl0zmb/jA5UKAt4VoEVw1qxKWjE/Bpp46npY= github.com/cenkalti/backoff v2.1.1+incompatible h1:tKJnvO2kl0zmb/jA5UKAt4VoEVw1qxKWjE/Bpp46npY=
github.com/cenkalti/backoff v2.1.1+incompatible/go.mod h1:90ReRw6GdpyfrHakVjL/QHaoyV4aDUVVkXQJJJ3NXXM= github.com/cenkalti/backoff v2.1.1+incompatible/go.mod h1:90ReRw6GdpyfrHakVjL/QHaoyV4aDUVVkXQJJJ3NXXM=
github.com/cespare/xxhash v1.1.0 h1:a6HrQnmkObjyL+Gs60czilIUGqrzKutQD6XZog3p+ko=
github.com/cespare/xxhash v1.1.0/go.mod h1:XrSqR1VqqWfGrhpAt58auRo0WTKS1nRRg3ghfAqPWnc=
github.com/client9/misspell v0.3.4/go.mod h1:qj6jICC3Q7zFZvVWo7KLAzC3yx5G7kyvSDkc90ppPyw= github.com/client9/misspell v0.3.4/go.mod h1:qj6jICC3Q7zFZvVWo7KLAzC3yx5G7kyvSDkc90ppPyw=
github.com/cpuguy83/go-md2man v1.0.10 h1:BSKMNlYxDvnunlTymqtgONjNnaRV1sTpcovwwjF22jk= github.com/cpuguy83/go-md2man v1.0.10 h1:BSKMNlYxDvnunlTymqtgONjNnaRV1sTpcovwwjF22jk=
github.com/cpuguy83/go-md2man v1.0.10/go.mod h1:SmD6nW6nTyfqj6ABTjUi3V3JVMnlJmwcJI5acqYI6dE= github.com/cpuguy83/go-md2man v1.0.10/go.mod h1:SmD6nW6nTyfqj6ABTjUi3V3JVMnlJmwcJI5acqYI6dE=
@ -145,6 +149,8 @@ github.com/smartystreets/assertions v0.0.0-20190401211740-f487f9de1cd3 h1:hBSHah
github.com/smartystreets/assertions v0.0.0-20190401211740-f487f9de1cd3/go.mod h1:OnSkiWE9lh6wB0YB77sQom3nweQdgAjqCqsofrRNTgc= github.com/smartystreets/assertions v0.0.0-20190401211740-f487f9de1cd3/go.mod h1:OnSkiWE9lh6wB0YB77sQom3nweQdgAjqCqsofrRNTgc=
github.com/smartystreets/goconvey v0.0.0-20190330032615-68dc04aab96a h1:pa8hGb/2YqsZKovtsgrwcDH1RZhVbTKCjLp47XpqCDs= github.com/smartystreets/goconvey v0.0.0-20190330032615-68dc04aab96a h1:pa8hGb/2YqsZKovtsgrwcDH1RZhVbTKCjLp47XpqCDs=
github.com/smartystreets/goconvey v0.0.0-20190330032615-68dc04aab96a/go.mod h1:syvi0/a8iFYH4r/RixwvyeAJjdLS9QV7WQ/tjFTllLA= github.com/smartystreets/goconvey v0.0.0-20190330032615-68dc04aab96a/go.mod h1:syvi0/a8iFYH4r/RixwvyeAJjdLS9QV7WQ/tjFTllLA=
github.com/spaolacci/murmur3 v0.0.0-20180118202830-f09979ecbc72 h1:qLC7fQah7D6K1B0ujays3HV9gkFtllcxhzImRR7ArPQ=
github.com/spaolacci/murmur3 v0.0.0-20180118202830-f09979ecbc72/go.mod h1:JwIasOWyU6f++ZhiEuf87xNszmSA2myDM2Kzu9HwQUA=
github.com/spf13/cobra v0.0.3 h1:ZlrZ4XsMRm04Fr5pSFxBgfND2EBVa1nLpiy1stUsX/8= github.com/spf13/cobra v0.0.3 h1:ZlrZ4XsMRm04Fr5pSFxBgfND2EBVa1nLpiy1stUsX/8=
github.com/spf13/cobra v0.0.3/go.mod h1:1l0Ry5zgKvJasoi3XT1TypsSe7PqH0Sj9dhYf7v3XqQ= github.com/spf13/cobra v0.0.3/go.mod h1:1l0Ry5zgKvJasoi3XT1TypsSe7PqH0Sj9dhYf7v3XqQ=
github.com/spf13/pflag v1.0.3 h1:zPAT6CGy6wXeQ7NtTnaTerfKOsV6V6F8agHXFiazDkg= github.com/spf13/pflag v1.0.3 h1:zPAT6CGy6wXeQ7NtTnaTerfKOsV6V6F8agHXFiazDkg=

View File

@ -5,29 +5,20 @@
// request and avoiding repeated downloads of the same pack. In addition, // request and avoiding repeated downloads of the same pack. In addition,
// several pack files are fetched concurrently. // several pack files are fetched concurrently.
// //
// Here is high-level pseudo-code of the how the Restorer attempts to achieve // Here is high-level pseudo-code of how the Restorer attempts to achieve
// these goals: // these goals:
// //
// while there are packs to process // while there are packs to process
// choose a pack to process [1] // choose a pack to process [1]
// get the pack from the backend or cache [2] // retrieve the pack from the backend [2]
// write pack blobs to the files that need them [3] // write pack blobs to the files that need them [3]
// if not all pack blobs were used
// cache the pack for future use [4]
// //
// Pack download and processing (steps [2] - [4]) runs on multiple concurrent // Retrieval of repository packs (step [2]) and writing target files (step [3])
// Goroutines. The Restorer runs all steps [2]-[4] sequentially on the same // are performed concurrently on multiple goroutines.
// Goroutine.
// //
// Before a pack is downloaded (step [2]), the required space is "reserved" in // Implementation does not guarantee order in which blobs are written to the
// the pack cache. Actual download uses single backend request to get all // target files and, for example, the last blob of a file can be written to the
// required pack blobs. This may download blobs that are not needed, but we // file before any of the preceeding file blobs. It is therefore possible to
// assume it'll still be faster than getting individual blobs. // have gaps in the data written to the target files if restore fails or
// // interrupted by the user.
// Target files are written (step [3]) in the "right" order, first file blob
// first, then second, then third and so on. Blob write order implies that some
// pack blobs may not be immediately used, i.e. they are "out of order" for
// their respective target files. Packs with unused blobs are cached (step
// [4]). The cache has capacity limit and may purge packs before they are fully
// used, in which case the purged packs will need to be re-downloaded.
package restorer package restorer

View File

@ -1,52 +0,0 @@
package restorer
import (
"github.com/restic/restic/internal/errors"
"github.com/restic/restic/internal/restic"
)
type filePackTraverser struct {
lookup func(restic.ID, restic.BlobType) ([]restic.PackedBlob, bool)
}
// iterates over all remaining packs of the file
func (t *filePackTraverser) forEachFilePack(file *fileInfo, fn func(packIdx int, packID restic.ID, packBlobs []restic.Blob) bool) error {
if len(file.blobs) == 0 {
return nil
}
getBlobPack := func(blobID restic.ID) (restic.PackedBlob, error) {
packs, found := t.lookup(blobID, restic.DataBlob)
if !found {
return restic.PackedBlob{}, errors.Errorf("Unknown blob %s", blobID.String())
}
// TODO which pack to use if multiple packs have the blob?
// MUST return the same pack for the same blob during the same execution
return packs[0], nil
}
var prevPackID restic.ID
var prevPackBlobs []restic.Blob
packIdx := 0
for _, blobID := range file.blobs {
packedBlob, err := getBlobPack(blobID)
if err != nil {
return err
}
if !prevPackID.IsNull() && prevPackID != packedBlob.PackID {
if !fn(packIdx, prevPackID, prevPackBlobs) {
return nil
}
packIdx++
}
if prevPackID != packedBlob.PackID {
prevPackID = packedBlob.PackID
prevPackBlobs = make([]restic.Blob, 0)
}
prevPackBlobs = append(prevPackBlobs, packedBlob.Blob)
}
if len(prevPackBlobs) > 0 {
fn(packIdx, prevPackID, prevPackBlobs)
}
return nil
}

View File

@ -1,9 +1,12 @@
package restorer package restorer
import ( import (
"bytes"
"context" "context"
"io" "io"
"math"
"path/filepath" "path/filepath"
"sync"
"github.com/restic/restic/internal/crypto" "github.com/restic/restic/internal/crypto"
"github.com/restic/restic/internal/debug" "github.com/restic/restic/internal/debug"
@ -15,66 +18,58 @@ import (
// TODO evaluate if it makes sense to split download and processing workers // TODO evaluate if it makes sense to split download and processing workers
// pro: can (slowly) read network and decrypt/write files concurrently // pro: can (slowly) read network and decrypt/write files concurrently
// con: each worker needs to keep one pack in memory // con: each worker needs to keep one pack in memory
// TODO evaluate memory footprint for larger repositories, say 10M packs/10M files
// TODO consider replacing pack file cache with blob cache
// TODO avoid decrypting the same blob multiple times
// TODO evaluate disabled debug logging overhead for large repositories
const ( const (
workerCount = 8 workerCount = 8
// max number of cached open output file handles // fileInfo flags
filesWriterCacheCap = 32 fileProgress = 1
fileError = 2
// estimated average pack size used to calculate pack cache capacity largeFileBlobCount = 25
averagePackSize = 5 * 1024 * 1024
// pack cache capacity should support at least one cached pack per worker
// allow space for extra 5 packs for actual caching
packCacheCapacity = (workerCount + 5) * averagePackSize
) )
// information about regular file being restored // information about regular file being restored
type fileInfo struct { type fileInfo struct {
lock sync.Mutex
flags int
location string // file on local filesystem relative to restorer basedir location string // file on local filesystem relative to restorer basedir
blobs []restic.ID // remaining blobs of the file blobs interface{} // blobs of the file
}
type fileBlobInfo struct {
id restic.ID // the blob id
offset int64 // blob offset in the file
} }
// information about a data pack required to restore one or more files // information about a data pack required to restore one or more files
type packInfo struct { type packInfo struct {
// the pack id id restic.ID // the pack id
id restic.ID files map[*fileInfo]struct{} // set of files that use blobs from this pack
// set of files that use blobs from this pack
files map[*fileInfo]struct{}
// number of other packs that must be downloaded before all blobs in this pack can be used
cost int
// used by packHeap
index int
} }
// fileRestorer restores set of files // fileRestorer restores set of files
type fileRestorer struct { type fileRestorer struct {
key *crypto.Key key *crypto.Key
idx filePackTraverser idx func(restic.ID, restic.BlobType) ([]restic.PackedBlob, bool)
packLoader func(ctx context.Context, h restic.Handle, length int, offset int64, fn func(rd io.Reader) error) error packLoader func(ctx context.Context, h restic.Handle, length int, offset int64, fn func(rd io.Reader) error) error
packCache *packCache // pack cache filesWriter *filesWriter
filesWriter *filesWriter // file write
dst string dst string
files []*fileInfo files []*fileInfo
} }
func newFileRestorer(dst string, packLoader func(ctx context.Context, h restic.Handle, length int, offset int64, fn func(rd io.Reader) error) error, key *crypto.Key, idx filePackTraverser) *fileRestorer { func newFileRestorer(dst string,
packLoader func(ctx context.Context, h restic.Handle, length int, offset int64, fn func(rd io.Reader) error) error,
key *crypto.Key,
idx func(restic.ID, restic.BlobType) ([]restic.PackedBlob, bool)) *fileRestorer {
return &fileRestorer{ return &fileRestorer{
packLoader: packLoader,
key: key, key: key,
idx: idx, idx: idx,
filesWriter: newFilesWriter(filesWriterCacheCap), packLoader: packLoader,
packCache: newPackCache(packCacheCapacity), filesWriter: newFilesWriter(workerCount),
dst: dst, dst: dst,
} }
} }
@ -87,237 +82,237 @@ func (r *fileRestorer) targetPath(location string) string {
return filepath.Join(r.dst, location) return filepath.Join(r.dst, location)
} }
// used to pass information among workers (wish golang channels allowed multivalues) func (r *fileRestorer) forEachBlob(blobIDs []restic.ID, fn func(packID restic.ID, packBlob restic.Blob)) error {
type processingInfo struct { if len(blobIDs) == 0 {
pack *packInfo return nil
files map[*fileInfo]error
} }
func (r *fileRestorer) restoreFiles(ctx context.Context, onError func(path string, err error)) error { for _, blobID := range blobIDs {
// TODO conditionally enable when debug log is on packs, found := r.idx(blobID, restic.DataBlob)
// for _, file := range r.files { if !found {
// dbgmsg := file.location + ": " return errors.Errorf("Unknown blob %s", blobID.String())
// r.idx.forEachFilePack(file, func(packIdx int, packID restic.ID, packBlobs []restic.Blob) bool {
// if packIdx > 0 {
// dbgmsg += ", "
// }
// dbgmsg += "pack{id=" + packID.Str() + ", blobs: "
// for blobIdx, blob := range packBlobs {
// if blobIdx > 0 {
// dbgmsg += ", "
// }
// dbgmsg += blob.ID.Str()
// }
// dbgmsg += "}"
// return true // keep going
// })
// debug.Log(dbgmsg)
// }
inprogress := make(map[*fileInfo]struct{})
queue, err := newPackQueue(r.idx, r.files, func(files map[*fileInfo]struct{}) bool {
for file := range files {
if _, found := inprogress[file]; found {
return true
}
}
return false
})
if err != nil {
return err
}
// workers
downloadCh := make(chan processingInfo)
feedbackCh := make(chan processingInfo)
defer close(downloadCh)
defer close(feedbackCh)
worker := func() {
for {
select {
case <-ctx.Done():
return
case request, ok := <-downloadCh:
if !ok {
return // channel closed
}
rd, err := r.downloadPack(ctx, request.pack)
if err == nil {
r.processPack(ctx, request, rd)
} else {
// mark all files as failed
for file := range request.files {
request.files[file] = err
}
}
feedbackCh <- request
}
}
}
for i := 0; i < workerCount; i++ {
go worker()
}
processFeedback := func(pack *packInfo, ferrors map[*fileInfo]error) {
// update files blobIdx
// must do it here to avoid race among worker and processing feedback threads
var success []*fileInfo
var failure []*fileInfo
for file, ferr := range ferrors {
target := r.targetPath(file.location)
if ferr != nil {
onError(file.location, ferr)
r.filesWriter.close(target)
delete(inprogress, file)
failure = append(failure, file)
} else {
r.idx.forEachFilePack(file, func(packIdx int, packID restic.ID, packBlobs []restic.Blob) bool {
file.blobs = file.blobs[len(packBlobs):]
return false // only interesed in the first pack
})
if len(file.blobs) == 0 {
r.filesWriter.close(target)
delete(inprogress, file)
}
success = append(success, file)
}
}
// update the queue and requeueu the pack as necessary
if !queue.requeuePack(pack, success, failure) {
r.packCache.remove(pack.id)
debug.Log("Purged used up pack %s from pack cache", pack.id.Str())
}
}
// the main restore loop
for !queue.isEmpty() {
debug.Log("-----------------------------------")
pack, files := queue.nextPack()
if pack != nil {
ferrors := make(map[*fileInfo]error)
for _, file := range files {
ferrors[file] = nil
inprogress[file] = struct{}{}
}
select {
case <-ctx.Done():
return ctx.Err()
case downloadCh <- processingInfo{pack: pack, files: ferrors}:
debug.Log("Scheduled download pack %s (%d files)", pack.id.Str(), len(files))
case feedback := <-feedbackCh:
queue.requeuePack(pack, []*fileInfo{}, []*fileInfo{}) // didn't use the pack during this iteration
processFeedback(feedback.pack, feedback.files)
}
} else {
select {
case <-ctx.Done():
return ctx.Err()
case feedback := <-feedbackCh:
processFeedback(feedback.pack, feedback.files)
}
} }
fn(packs[0].PackID, packs[0].Blob)
} }
return nil return nil
} }
func (r *fileRestorer) downloadPack(ctx context.Context, pack *packInfo) (readerAtCloser, error) { func (r *fileRestorer) restoreFiles(ctx context.Context) error {
const MaxInt64 = 1<<63 - 1 // odd Go does not have this predefined somewhere
// calculate pack byte range packs := make(map[restic.ID]*packInfo) // all packs
start, end := int64(MaxInt64), int64(0)
// create packInfo from fileInfo
for _, file := range r.files {
fileBlobs := file.blobs.(restic.IDs)
largeFile := len(fileBlobs) > largeFileBlobCount
var packsMap map[restic.ID][]fileBlobInfo
if largeFile {
packsMap = make(map[restic.ID][]fileBlobInfo)
}
fileOffset := int64(0)
err := r.forEachBlob(fileBlobs, func(packID restic.ID, blob restic.Blob) {
if largeFile {
packsMap[packID] = append(packsMap[packID], fileBlobInfo{id: blob.ID, offset: fileOffset})
fileOffset += int64(blob.Length) - crypto.Extension
}
pack, ok := packs[packID]
if !ok {
pack = &packInfo{
id: packID,
files: make(map[*fileInfo]struct{}),
}
packs[packID] = pack
}
pack.files[file] = struct{}{}
})
if err != nil {
// repository index is messed up, can't do anything
return err
}
if largeFile {
file.blobs = packsMap
}
}
var wg sync.WaitGroup
downloadCh := make(chan *packInfo)
worker := func() {
defer wg.Done()
for {
select {
case <-ctx.Done():
return // context cancelled
case pack, ok := <-downloadCh:
if !ok {
return // channel closed
}
r.downloadPack(ctx, pack)
}
}
}
for i := 0; i < workerCount; i++ {
go worker()
wg.Add(1)
}
// the main restore loop
for _, pack := range packs {
select {
case <-ctx.Done():
return ctx.Err()
case downloadCh <- pack:
debug.Log("Scheduled download pack %s", pack.id.Str())
}
}
close(downloadCh)
wg.Wait()
return nil
}
func (r *fileRestorer) downloadPack(ctx context.Context, pack *packInfo) {
// calculate pack byte range and blob->[]files->[]offsets mappings
start, end := int64(math.MaxInt64), int64(0)
blobs := make(map[restic.ID]struct {
offset int64 // offset of the blob in the pack
length int // length of the blob
files map[*fileInfo][]int64 // file -> offsets (plural!) of the blob in the file
})
for file := range pack.files { for file := range pack.files {
r.idx.forEachFilePack(file, func(packIdx int, packID restic.ID, packBlobs []restic.Blob) bool { addBlob := func(blob restic.Blob, fileOffset int64) {
if packID.Equal(pack.id) {
for _, blob := range packBlobs {
if start > int64(blob.Offset) { if start > int64(blob.Offset) {
start = int64(blob.Offset) start = int64(blob.Offset)
} }
if end < int64(blob.Offset+blob.Length) { if end < int64(blob.Offset+blob.Length) {
end = int64(blob.Offset + blob.Length) end = int64(blob.Offset + blob.Length)
} }
blobInfo, ok := blobs[blob.ID]
if !ok {
blobInfo.offset = int64(blob.Offset)
blobInfo.length = int(blob.Length)
blobInfo.files = make(map[*fileInfo][]int64)
blobs[blob.ID] = blobInfo
} }
blobInfo.files[file] = append(blobInfo.files[file], fileOffset)
} }
if fileBlobs, ok := file.blobs.(restic.IDs); ok {
return true // keep going fileOffset := int64(0)
r.forEachBlob(fileBlobs, func(packID restic.ID, blob restic.Blob) {
if packID.Equal(pack.id) {
addBlob(blob, fileOffset)
}
fileOffset += int64(blob.Length) - crypto.Extension
}) })
} else if packsMap, ok := file.blobs.(map[restic.ID][]fileBlobInfo); ok {
for _, blob := range packsMap[pack.id] {
idxPacks, found := r.idx(blob.id, restic.DataBlob)
if found {
for _, idxPack := range idxPacks {
if idxPack.PackID.Equal(pack.id) {
addBlob(idxPack.Blob, blob.offset)
break
}
}
}
}
}
} }
packReader, err := r.packCache.get(pack.id, start, int(end-start), func(offset int64, length int, wr io.WriteSeeker) error { packData := make([]byte, int(end-start))
h := restic.Handle{Type: restic.DataFile, Name: pack.id.String()} h := restic.Handle{Type: restic.DataFile, Name: pack.id.String()}
return r.packLoader(ctx, h, length, offset, func(rd io.Reader) error { err := r.packLoader(ctx, h, int(end-start), start, func(rd io.Reader) error {
// reset the file in case of a download retry l, err := io.ReadFull(rd, packData)
_, err := wr.Seek(0, io.SeekStart)
if err != nil { if err != nil {
return err return err
} }
if l != len(packData) {
len, err := io.Copy(wr, rd) return errors.Errorf("unexpected pack size: expected %d but got %d", len(packData), l)
if err != nil {
return err
} }
if len != int64(length) {
return errors.Errorf("unexpected pack size: expected %d but got %d", length, len)
}
return nil return nil
}) })
})
markFileError := func(file *fileInfo, err error) {
file.lock.Lock()
defer file.lock.Unlock()
if file.flags&fileError == 0 {
file.flags |= fileError
}
}
if err != nil { if err != nil {
return nil, err for file := range pack.files {
markFileError(file, err)
}
return
} }
return packReader, nil rd := bytes.NewReader(packData)
}
func (r *fileRestorer) processPack(ctx context.Context, request processingInfo, rd readerAtCloser) { for blobID, blob := range blobs {
defer rd.Close() blobData, err := r.loadBlob(rd, blobID, blob.offset-start, blob.length)
for file := range request.files {
target := r.targetPath(file.location)
r.idx.forEachFilePack(file, func(packIdx int, packID restic.ID, packBlobs []restic.Blob) bool {
for _, blob := range packBlobs {
debug.Log("Writing blob %s (%d bytes) from pack %s to %s", blob.ID.Str(), blob.Length, packID.Str(), file.location)
buf, err := r.loadBlob(rd, blob)
if err == nil {
err = r.filesWriter.writeToFile(target, buf)
}
if err != nil { if err != nil {
request.files[file] = err for file := range blob.files {
break // could not restore the file markFileError(file, err)
}
continue
}
for file, offsets := range blob.files {
for _, offset := range offsets {
writeToFile := func() error {
// this looks overly complicated and needs explanation
// two competing requirements:
// - must create the file once and only once
// - should allow concurrent writes to the file
// so write the first blob while holding file lock
// write other blobs after releasing the lock
file.lock.Lock()
create := file.flags&fileProgress == 0
if create {
defer file.lock.Unlock()
file.flags |= fileProgress
} else {
file.lock.Unlock()
}
return r.filesWriter.writeToFile(r.targetPath(file.location), blobData, offset, create)
}
err := writeToFile()
if err != nil {
markFileError(file, err)
break
}
} }
} }
return false
})
} }
} }
func (r *fileRestorer) loadBlob(rd io.ReaderAt, blob restic.Blob) ([]byte, error) { func (r *fileRestorer) loadBlob(rd io.ReaderAt, blobID restic.ID, offset int64, length int) ([]byte, error) {
// TODO reconcile with Repository#loadBlob implementation // TODO reconcile with Repository#loadBlob implementation
buf := make([]byte, blob.Length) buf := make([]byte, length)
n, err := rd.ReadAt(buf, int64(blob.Offset)) n, err := rd.ReadAt(buf, offset)
if err != nil { if err != nil {
return nil, err return nil, err
} }
if n != int(blob.Length) { if n != length {
return nil, errors.Errorf("error loading blob %v: wrong length returned, want %d, got %d", blob.ID.Str(), blob.Length, n) return nil, errors.Errorf("error loading blob %v: wrong length returned, want %d, got %d", blobID.Str(), length, n)
} }
// decrypt // decrypt
nonce, ciphertext := buf[:r.key.NonceSize()], buf[r.key.NonceSize():] nonce, ciphertext := buf[:r.key.NonceSize()], buf[r.key.NonceSize():]
plaintext, err := r.key.Open(ciphertext[:0], nonce, ciphertext, nil) plaintext, err := r.key.Open(ciphertext[:0], nonce, ciphertext, nil)
if err != nil { if err != nil {
return nil, errors.Errorf("decrypting blob %v failed: %v", blob.ID, err) return nil, errors.Errorf("decrypting blob %v failed: %v", blobID, err)
} }
// check hash // check hash
if !restic.Hash(plaintext).Equal(blob.ID) { if !restic.Hash(plaintext).Equal(blobID) {
return nil, errors.Errorf("blob %v returned invalid hash", blob.ID) return nil, errors.Errorf("blob %v returned invalid hash", blobID)
} }
return plaintext, nil return plaintext, nil

View File

@ -8,7 +8,6 @@ import (
"testing" "testing"
"github.com/restic/restic/internal/crypto" "github.com/restic/restic/internal/crypto"
"github.com/restic/restic/internal/errors"
"github.com/restic/restic/internal/restic" "github.com/restic/restic/internal/restic"
rtest "github.com/restic/restic/internal/test" rtest "github.com/restic/restic/internal/test"
) )
@ -38,9 +37,6 @@ type TestRepo struct {
// //
loader func(ctx context.Context, h restic.Handle, length int, offset int64, fn func(rd io.Reader) error) error loader func(ctx context.Context, h restic.Handle, length int, offset int64, fn func(rd io.Reader) error) error
//
idx filePackTraverser
} }
func (i *TestRepo) Lookup(blobID restic.ID, _ restic.BlobType) ([]restic.PackedBlob, bool) { func (i *TestRepo) Lookup(blobID restic.ID, _ restic.BlobType) ([]restic.PackedBlob, bool) {
@ -56,11 +52,6 @@ func (i *TestRepo) packID(name string) restic.ID {
return i.packsNameToID[name] return i.packsNameToID[name]
} }
func (i *TestRepo) pack(queue *packQueue, name string) *packInfo {
id := i.packsNameToID[name]
return queue.packs[id]
}
func (i *TestRepo) fileContent(file *fileInfo) string { func (i *TestRepo) fileContent(file *fileInfo) string {
return i.filesPathToContent[file.location] return i.filesPathToContent[file.location]
} }
@ -147,7 +138,6 @@ func newTestRepo(content []TestFile) *TestRepo {
files: files, files: files,
filesPathToContent: filesPathToContent, filesPathToContent: filesPathToContent,
} }
repo.idx = filePackTraverser{lookup: repo.Lookup}
repo.loader = func(ctx context.Context, h restic.Handle, length int, offset int64, fn func(rd io.Reader) error) error { repo.loader = func(ctx context.Context, h restic.Handle, length int, offset int64, fn func(rd io.Reader) error) error {
packID, err := restic.ParseID(h.Name) packID, err := restic.ParseID(h.Name)
if err != nil { if err != nil {
@ -163,12 +153,11 @@ func newTestRepo(content []TestFile) *TestRepo {
func restoreAndVerify(t *testing.T, tempdir string, content []TestFile) { func restoreAndVerify(t *testing.T, tempdir string, content []TestFile) {
repo := newTestRepo(content) repo := newTestRepo(content)
r := newFileRestorer(tempdir, repo.loader, repo.key, repo.idx) r := newFileRestorer(tempdir, repo.loader, repo.key, repo.Lookup)
r.files = repo.files r.files = repo.files
r.restoreFiles(context.TODO(), func(path string, err error) { err := r.restoreFiles(context.TODO())
rtest.OK(t, errors.Wrapf(err, "unexpected error")) rtest.OK(t, err)
})
for _, file := range repo.files { for _, file := range repo.files {
target := r.targetPath(file.location) target := r.targetPath(file.location)
@ -178,16 +167,11 @@ func restoreAndVerify(t *testing.T, tempdir string, content []TestFile) {
continue continue
} }
_, contains := r.filesWriter.cache[target]
rtest.Equals(t, false, contains)
content := repo.fileContent(file) content := repo.fileContent(file)
if !bytes.Equal(data, []byte(content)) { if !bytes.Equal(data, []byte(content)) {
t.Errorf("file %v has wrong content: want %q, got %q", file.location, content, data) t.Errorf("file %v has wrong content: want %q, got %q", file.location, content, data)
} }
} }
rtest.OK(t, nil)
} }
func TestFileRestorerBasic(t *testing.T) { func TestFileRestorerBasic(t *testing.T) {
@ -209,5 +193,13 @@ func TestFileRestorerBasic(t *testing.T) {
TestBlob{"data2-2", "pack2-2"}, TestBlob{"data2-2", "pack2-2"},
}, },
}, },
TestFile{
name: "file3",
blobs: []TestBlob{
// same blob multiple times
TestBlob{"data3-1", "pack3-1"},
TestBlob{"data3-1", "pack3-1"},
},
},
}) })
} }

View File

@ -4,98 +4,89 @@ import (
"os" "os"
"sync" "sync"
"github.com/restic/restic/internal/debug" "github.com/cespare/xxhash"
"github.com/restic/restic/internal/errors"
) )
// Writes blobs to output files. Each file is written sequentially, // writes blobs to target files.
// start to finish, but multiple files can be written to concurrently. // multiple files can be written to concurrently.
// Implementation allows virtually unlimited number of logically open // multiple blobs can be concurrently written to the same file.
// files, but number of phisically open files will never exceed number // TODO I am not 100% convinced this is necessary, i.e. it may be okay
// of concurrent writeToFile invocations plus cacheCap. // to use multiple os.File to write to the same target file
type filesWriter struct { type filesWriter struct {
lock sync.Mutex // guards concurrent access to open files cache buckets []filesWriterBucket
inprogress map[string]struct{} // (logically) opened file writers
cache map[string]*os.File // cache of open files
cacheCap int // max number of cached open files
} }
func newFilesWriter(cacheCap int) *filesWriter { type filesWriterBucket struct {
lock sync.Mutex
files map[string]*os.File
users map[string]int
}
func newFilesWriter(count int) *filesWriter {
buckets := make([]filesWriterBucket, count)
for b := 0; b < count; b++ {
buckets[b].files = make(map[string]*os.File)
buckets[b].users = make(map[string]int)
}
return &filesWriter{ return &filesWriter{
inprogress: make(map[string]struct{}), buckets: buckets,
cache: make(map[string]*os.File),
cacheCap: cacheCap,
} }
} }
func (w *filesWriter) writeToFile(path string, blob []byte) error { func (w *filesWriter) writeToFile(path string, blob []byte, offset int64, create bool) error {
// First writeToFile invocation for any given path will: bucket := &w.buckets[uint(xxhash.Sum64String(path))%uint(len(w.buckets))]
// - create and open the file
// - write the blob to the file
// - cache the open file if there is space, close the file otherwise
// Subsequent invocations will:
// - remove the open file from the cache _or_ open the file for append
// - write the blob to the file
// - cache the open file if there is space, close the file otherwise
// The idea is to cap maximum number of open files with minimal
// coordination among concurrent writeToFile invocations (note that
// writeToFile never touches somebody else's open file).
// TODO measure if caching is useful (likely depends on operating system
// and hardware configuration)
acquireWriter := func() (*os.File, error) { acquireWriter := func() (*os.File, error) {
w.lock.Lock() bucket.lock.Lock()
defer w.lock.Unlock() defer bucket.lock.Unlock()
if wr, ok := w.cache[path]; ok {
debug.Log("Used cached writer for %s", path) if wr, ok := bucket.files[path]; ok {
delete(w.cache, path) bucket.users[path]++
return wr, nil return wr, nil
} }
var flags int var flags int
if _, append := w.inprogress[path]; append { if create {
flags = os.O_APPEND | os.O_WRONLY
} else {
w.inprogress[path] = struct{}{}
flags = os.O_CREATE | os.O_TRUNC | os.O_WRONLY flags = os.O_CREATE | os.O_TRUNC | os.O_WRONLY
} else {
flags = os.O_WRONLY
} }
wr, err := os.OpenFile(path, flags, 0600) wr, err := os.OpenFile(path, flags, 0600)
if err != nil { if err != nil {
return nil, err return nil, err
} }
debug.Log("Opened writer for %s", path)
bucket.files[path] = wr
bucket.users[path] = 1
return wr, nil return wr, nil
} }
cacheOrCloseWriter := func(wr *os.File) {
w.lock.Lock() releaseWriter := func(wr *os.File) error {
defer w.lock.Unlock() bucket.lock.Lock()
if len(w.cache) < w.cacheCap { defer bucket.lock.Unlock()
w.cache[path] = wr
} else { if bucket.users[path] == 1 {
wr.Close() delete(bucket.files, path)
delete(bucket.users, path)
return wr.Close()
} }
bucket.users[path]--
return nil
} }
wr, err := acquireWriter() wr, err := acquireWriter()
if err != nil { if err != nil {
return err return err
} }
n, err := wr.Write(blob)
cacheOrCloseWriter(wr) _, err = wr.WriteAt(blob, offset)
if err != nil { if err != nil {
releaseWriter(wr)
return err return err
} }
if n != len(blob) {
return errors.Errorf("error writing file %v: wrong length written, want %d, got %d", path, len(blob), n)
}
return nil
}
func (w *filesWriter) close(path string) { return releaseWriter(wr)
w.lock.Lock()
defer w.lock.Unlock()
if wr, ok := w.cache[path]; ok {
wr.Close()
delete(w.cache, path)
}
delete(w.inprogress, path)
} }

View File

@ -16,23 +16,21 @@ func TestFilesWriterBasic(t *testing.T) {
f1 := dir + "/f1" f1 := dir + "/f1"
f2 := dir + "/f2" f2 := dir + "/f2"
rtest.OK(t, w.writeToFile(f1, []byte{1})) rtest.OK(t, w.writeToFile(f1, []byte{1}, 0, true))
rtest.Equals(t, 1, len(w.cache)) rtest.Equals(t, 0, len(w.buckets[0].files))
rtest.Equals(t, 1, len(w.inprogress)) rtest.Equals(t, 0, len(w.buckets[0].users))
rtest.OK(t, w.writeToFile(f2, []byte{2})) rtest.OK(t, w.writeToFile(f2, []byte{2}, 0, true))
rtest.Equals(t, 1, len(w.cache)) rtest.Equals(t, 0, len(w.buckets[0].files))
rtest.Equals(t, 2, len(w.inprogress)) rtest.Equals(t, 0, len(w.buckets[0].users))
rtest.OK(t, w.writeToFile(f1, []byte{1})) rtest.OK(t, w.writeToFile(f1, []byte{1}, 1, false))
w.close(f1) rtest.Equals(t, 0, len(w.buckets[0].files))
rtest.Equals(t, 0, len(w.cache)) rtest.Equals(t, 0, len(w.buckets[0].users))
rtest.Equals(t, 1, len(w.inprogress))
rtest.OK(t, w.writeToFile(f2, []byte{2})) rtest.OK(t, w.writeToFile(f2, []byte{2}, 1, false))
w.close(f2) rtest.Equals(t, 0, len(w.buckets[0].files))
rtest.Equals(t, 0, len(w.cache)) rtest.Equals(t, 0, len(w.buckets[0].users))
rtest.Equals(t, 0, len(w.inprogress))
buf, err := ioutil.ReadFile(f1) buf, err := ioutil.ReadFile(f1)
rtest.OK(t, err) rtest.OK(t, err)

View File

@ -1,243 +0,0 @@
package restorer
import (
"io"
"sync"
"github.com/restic/restic/internal/debug"
"github.com/restic/restic/internal/errors"
"github.com/restic/restic/internal/restic"
)
// packCache is thread safe in-memory cache of pack files required to restore
// one or more files. The cache is meant to hold pack files that cannot be
// fully used right away. This happens when pack files contains blobs from
// "head" of some files and "middle" of other files. "Middle" blobs cannot be
// written to their files until after blobs from some other packs are written
// to the files first.
//
// While the cache is thread safe, implementation assumes (and enforces)
// that individual entries are used by one client at a time. Clients must
// #Close() entry's reader to make the entry available for use by other
// clients. This limitation can be relaxed in the future if necessary.
type packCache struct {
// guards access to cache internal data structures
lock sync.Mutex
// cache capacity
capacity int
reservedCapacity int
allocatedCapacity int
// pack records currently being used by active restore worker
reservedPacks map[restic.ID]*packCacheRecord
// unused allocated packs, can be deleted if necessary
cachedPacks map[restic.ID]*packCacheRecord
}
type packCacheRecord struct {
master *packCacheRecord
cache *packCache
id restic.ID // cached pack id
offset int64 // cached pack byte range
data []byte
}
type readerAtCloser interface {
io.Closer
io.ReaderAt
}
type bytesWriteSeeker struct {
pos int
data []byte
}
func (wr *bytesWriteSeeker) Write(p []byte) (n int, err error) {
if wr.pos+len(p) > len(wr.data) {
return -1, errors.Errorf("not enough space")
}
n = copy(wr.data[wr.pos:], p)
wr.pos += n
return n, nil
}
func (wr *bytesWriteSeeker) Seek(offset int64, whence int) (int64, error) {
if offset != 0 || whence != io.SeekStart {
return -1, errors.Errorf("unsupported seek request")
}
wr.pos = 0
return 0, nil
}
func newPackCache(capacity int) *packCache {
return &packCache{
capacity: capacity,
reservedPacks: make(map[restic.ID]*packCacheRecord),
cachedPacks: make(map[restic.ID]*packCacheRecord),
}
}
func (c *packCache) reserve(packID restic.ID, offset int64, length int) (record *packCacheRecord, err error) {
c.lock.Lock()
defer c.lock.Unlock()
if offset < 0 || length <= 0 {
return nil, errors.Errorf("illegal pack cache allocation range %s {offset: %d, length: %d}", packID.Str(), offset, length)
}
if c.reservedCapacity+length > c.capacity {
return nil, errors.Errorf("not enough cache capacity: requested %d, available %d", length, c.capacity-c.reservedCapacity)
}
if _, ok := c.reservedPacks[packID]; ok {
return nil, errors.Errorf("pack is already reserved %s", packID.Str())
}
// the pack is available in the cache and currently unused
if pack, ok := c.cachedPacks[packID]; ok {
// check if cached pack includes requested byte range
// the range can shrink, but it never grows bigger unless there is a bug elsewhere
if pack.offset > offset || (pack.offset+int64(len(pack.data))) < (offset+int64(length)) {
return nil, errors.Errorf("cached range %d-%d is smaller than requested range %d-%d for pack %s", pack.offset, pack.offset+int64(len(pack.data)), length, offset+int64(length), packID.Str())
}
// move the pack to the used map
delete(c.cachedPacks, packID)
c.reservedPacks[packID] = pack
c.reservedCapacity += len(pack.data)
debug.Log("Using cached pack %s (%d bytes)", pack.id.Str(), len(pack.data))
if pack.offset != offset || len(pack.data) != length {
// restrict returned record to requested range
return &packCacheRecord{
cache: c,
master: pack,
offset: offset,
data: pack.data[int(offset-pack.offset) : int(offset-pack.offset)+length],
}, nil
}
return pack, nil
}
for c.allocatedCapacity+length > c.capacity {
// all cached packs will be needed at some point
// so it does not matter which one to purge
for _, cached := range c.cachedPacks {
delete(c.cachedPacks, cached.id)
c.allocatedCapacity -= len(cached.data)
debug.Log("dropped cached pack %s (%d bytes)", cached.id.Str(), len(cached.data))
break
}
}
pack := &packCacheRecord{
cache: c,
id: packID,
offset: offset,
}
c.reservedPacks[pack.id] = pack
c.allocatedCapacity += length
c.reservedCapacity += length
return pack, nil
}
// get returns reader of the specified cached pack. Uses provided load func
// to download pack content if necessary.
// The returned reader is only able to read pack within byte range specified
// by offset and length parameters, attempts to read outside that range will
// result in an error.
// The returned reader must be closed before the same packID can be requested
// from the cache again.
func (c *packCache) get(packID restic.ID, offset int64, length int, load func(offset int64, length int, wr io.WriteSeeker) error) (readerAtCloser, error) {
pack, err := c.reserve(packID, offset, length)
if err != nil {
return nil, err
}
if pack.data == nil {
releasePack := func() {
delete(c.reservedPacks, pack.id)
c.reservedCapacity -= length
c.allocatedCapacity -= length
}
wr := &bytesWriteSeeker{data: make([]byte, length)}
err = load(offset, length, wr)
if err != nil {
releasePack()
return nil, err
}
if wr.pos != length {
releasePack()
return nil, errors.Errorf("invalid read size")
}
pack.data = wr.data
debug.Log("Downloaded and cached pack %s (%d bytes)", pack.id.Str(), len(pack.data))
}
return pack, nil
}
// releases the pack record back to the cache
func (c *packCache) release(pack *packCacheRecord) error {
c.lock.Lock()
defer c.lock.Unlock()
if _, ok := c.reservedPacks[pack.id]; !ok {
return errors.Errorf("invalid pack release request")
}
delete(c.reservedPacks, pack.id)
c.cachedPacks[pack.id] = pack
c.reservedCapacity -= len(pack.data)
return nil
}
// remove removes specified pack from the cache and frees
// corresponding cache space. should be called after the pack
// was fully used up by the restorer.
func (c *packCache) remove(packID restic.ID) error {
c.lock.Lock()
defer c.lock.Unlock()
if _, ok := c.reservedPacks[packID]; ok {
return errors.Errorf("invalid pack remove request, pack %s is reserved", packID.Str())
}
pack, ok := c.cachedPacks[packID]
if !ok {
return errors.Errorf("invalid pack remove request, pack %s is not cached", packID.Str())
}
delete(c.cachedPacks, pack.id)
c.allocatedCapacity -= len(pack.data)
return nil
}
// ReadAt reads len(b) bytes from the pack starting at byte offset off.
// It returns the number of bytes read and the error, if any.
func (r *packCacheRecord) ReadAt(b []byte, off int64) (n int, err error) {
if off < r.offset || off+int64(len(b)) > r.offset+int64(len(r.data)) {
return -1, errors.Errorf("read outside available range")
}
return copy(b, r.data[off-r.offset:]), nil
}
// Close closes the pack reader and releases corresponding cache record
// to the cache. Once closed, the record can be reused by subsequent
// requests for the same packID or it can be purged from the cache to make
// room for other packs
func (r *packCacheRecord) Close() (err error) {
if r.master != nil {
return r.cache.release(r.master)
}
return r.cache.release(r)
}

View File

@ -1,305 +0,0 @@
package restorer
import (
"io"
"testing"
"github.com/restic/restic/internal/errors"
"github.com/restic/restic/internal/restic"
rtest "github.com/restic/restic/internal/test"
)
func assertNotOK(t *testing.T, msg string, err error) {
rtest.Assert(t, err != nil, msg+" did not fail")
}
func TestBytesWriterSeeker(t *testing.T) {
wr := &bytesWriteSeeker{data: make([]byte, 10)}
n, err := wr.Write([]byte{1, 2})
rtest.OK(t, err)
rtest.Equals(t, 2, n)
rtest.Equals(t, []byte{1, 2}, wr.data[0:2])
n64, err := wr.Seek(0, io.SeekStart)
rtest.OK(t, err)
rtest.Equals(t, int64(0), n64)
n, err = wr.Write([]byte{0, 1, 2, 3, 4})
rtest.OK(t, err)
rtest.Equals(t, 5, n)
n, err = wr.Write([]byte{5, 6, 7, 8, 9})
rtest.OK(t, err)
rtest.Equals(t, 5, n)
rtest.Equals(t, []byte{0, 1, 2, 3, 4, 5, 6, 7, 8, 9}, wr.data)
// negative tests
_, err = wr.Write([]byte{1})
assertNotOK(t, "write overflow", err)
_, err = wr.Seek(1, io.SeekStart)
assertNotOK(t, "unsupported seek", err)
}
func TestPackCacheBasic(t *testing.T) {
assertReader := func(expected []byte, offset int64, rd io.ReaderAt) {
actual := make([]byte, len(expected))
rd.ReadAt(actual, offset)
rtest.Equals(t, expected, actual)
}
c := newPackCache(10)
id := restic.NewRandomID()
// load pack to the cache
rd, err := c.get(id, 10, 5, func(offset int64, length int, wr io.WriteSeeker) error {
rtest.Equals(t, int64(10), offset)
rtest.Equals(t, 5, length)
wr.Write([]byte{1, 2, 3, 4, 5})
return nil
})
rtest.OK(t, err)
assertReader([]byte{1, 2, 3, 4, 5}, 10, rd)
// must close pack reader before can request it again
_, err = c.get(id, 10, 5, func(offset int64, length int, wr io.WriteSeeker) error {
t.Error("unexpected cache load call")
return nil
})
assertNotOK(t, "double-reservation", err)
// close the pack reader and get it from cache
rd.Close()
rd, err = c.get(id, 10, 5, func(offset int64, length int, wr io.WriteSeeker) error {
t.Error("unexpected cache load call")
return nil
})
rtest.OK(t, err)
assertReader([]byte{1, 2, 3, 4, 5}, 10, rd)
// close the pack reader and remove the pack from cache, assert the pack is loaded on request
rd.Close()
c.remove(id)
rd, err = c.get(id, 10, 5, func(offset int64, length int, wr io.WriteSeeker) error {
rtest.Equals(t, int64(10), offset)
rtest.Equals(t, 5, length)
wr.Write([]byte{1, 2, 3, 4, 5})
return nil
})
rtest.OK(t, err)
assertReader([]byte{1, 2, 3, 4, 5}, 10, rd)
}
func TestPackCacheInvalidRange(t *testing.T) {
c := newPackCache(10)
id := restic.NewRandomID()
_, err := c.get(id, -1, 1, func(offset int64, length int, wr io.WriteSeeker) error {
t.Error("unexpected cache load call")
return nil
})
assertNotOK(t, "negative offset request", err)
_, err = c.get(id, 0, 0, func(offset int64, length int, wr io.WriteSeeker) error {
t.Error("unexpected cache load call")
return nil
})
assertNotOK(t, "zero length request", err)
_, err = c.get(id, 0, -1, func(offset int64, length int, wr io.WriteSeeker) error {
t.Error("unexpected cache load call")
return nil
})
assertNotOK(t, "negative length", err)
}
func TestPackCacheCapacity(t *testing.T) {
c := newPackCache(10)
id1, id2, id3 := restic.NewRandomID(), restic.NewRandomID(), restic.NewRandomID()
// load and reserve pack1
rd1, err := c.get(id1, 0, 5, func(offset int64, length int, wr io.WriteSeeker) error {
wr.Write([]byte{1, 2, 3, 4, 5})
return nil
})
rtest.OK(t, err)
// load and reserve pack2
_, err = c.get(id2, 0, 5, func(offset int64, length int, wr io.WriteSeeker) error {
wr.Write([]byte{1, 2, 3, 4, 5})
return nil
})
rtest.OK(t, err)
// can't load pack3 because not enough space in the cache
_, err = c.get(id3, 0, 5, func(offset int64, length int, wr io.WriteSeeker) error {
t.Error("unexpected cache load call")
return nil
})
assertNotOK(t, "request over capacity", err)
// release pack1 and try again
rd1.Close()
rd3, err := c.get(id3, 0, 5, func(offset int64, length int, wr io.WriteSeeker) error {
wr.Write([]byte{1, 2, 3, 4, 5})
return nil
})
rtest.OK(t, err)
// release pack3 and load pack1 (should not come from cache)
rd3.Close()
loaded := false
rd1, err = c.get(id1, 0, 5, func(offset int64, length int, wr io.WriteSeeker) error {
wr.Write([]byte{1, 2, 3, 4, 5})
loaded = true
return nil
})
rtest.OK(t, err)
rtest.Equals(t, true, loaded)
}
func TestPackCacheDownsizeRecord(t *testing.T) {
c := newPackCache(10)
id := restic.NewRandomID()
// get bigger range first
rd, err := c.get(id, 5, 5, func(offset int64, length int, wr io.WriteSeeker) error {
wr.Write([]byte{1, 2, 3, 4, 5})
return nil
})
rtest.OK(t, err)
rd.Close()
// invalid "resize" requests
_, err = c.get(id, 5, 10, func(offset int64, length int, wr io.WriteSeeker) error {
t.Error("unexpected pack load")
return nil
})
assertNotOK(t, "resize cached record", err)
// invalid before cached range request
_, err = c.get(id, 0, 5, func(offset int64, length int, wr io.WriteSeeker) error {
t.Error("unexpected pack load")
return nil
})
assertNotOK(t, "before cached range request", err)
// invalid after cached range request
_, err = c.get(id, 10, 5, func(offset int64, length int, wr io.WriteSeeker) error {
t.Error("unexpected pack load")
return nil
})
assertNotOK(t, "after cached range request", err)
// now get smaller "nested" range
rd, err = c.get(id, 7, 1, func(offset int64, length int, wr io.WriteSeeker) error {
t.Error("unexpected pack load")
return nil
})
rtest.OK(t, err)
// assert expected data
buf := make([]byte, 1)
rd.ReadAt(buf, 7)
rtest.Equals(t, byte(3), buf[0])
_, err = rd.ReadAt(buf, 0)
assertNotOK(t, "read before downsized pack range", err)
_, err = rd.ReadAt(buf, 9)
assertNotOK(t, "read after downsized pack range", err)
// can't request downsized record again
_, err = c.get(id, 7, 1, func(offset int64, length int, wr io.WriteSeeker) error {
t.Error("unexpected pack load")
return nil
})
assertNotOK(t, "double-allocation of cache record subrange", err)
// can't request another subrange of the original record
_, err = c.get(id, 6, 1, func(offset int64, length int, wr io.WriteSeeker) error {
t.Error("unexpected pack load")
return nil
})
assertNotOK(t, "allocation of another subrange of cache record", err)
// release downsized record and assert the original is back in the cache
rd.Close()
rd, err = c.get(id, 5, 5, func(offset int64, length int, wr io.WriteSeeker) error {
t.Error("unexpected pack load")
return nil
})
rtest.OK(t, err)
rd.Close()
}
func TestPackCacheFailedDownload(t *testing.T) {
c := newPackCache(10)
assertEmpty := func() {
rtest.Equals(t, 0, len(c.cachedPacks))
rtest.Equals(t, 10, c.capacity)
rtest.Equals(t, 0, c.reservedCapacity)
rtest.Equals(t, 0, c.allocatedCapacity)
}
_, err := c.get(restic.NewRandomID(), 0, 5, func(offset int64, length int, wr io.WriteSeeker) error {
return errors.Errorf("expected induced test error")
})
assertNotOK(t, "not enough bytes read", err)
assertEmpty()
_, err = c.get(restic.NewRandomID(), 0, 5, func(offset int64, length int, wr io.WriteSeeker) error {
wr.Write([]byte{1})
return nil
})
assertNotOK(t, "not enough bytes read", err)
assertEmpty()
_, err = c.get(restic.NewRandomID(), 0, 5, func(offset int64, length int, wr io.WriteSeeker) error {
wr.Write([]byte{1, 2, 3, 4, 5, 6})
return nil
})
assertNotOK(t, "too many bytes read", err)
assertEmpty()
}
func TestPackCacheInvalidRequests(t *testing.T) {
c := newPackCache(10)
id := restic.NewRandomID()
//
rd, _ := c.get(id, 0, 1, func(offset int64, length int, wr io.WriteSeeker) error {
wr.Write([]byte{1})
return nil
})
assertNotOK(t, "remove() reserved pack", c.remove(id))
rtest.OK(t, rd.Close())
assertNotOK(t, "multiple reader Close() calls)", rd.Close())
//
rtest.OK(t, c.remove(id))
assertNotOK(t, "double remove() the same pack", c.remove(id))
}
func TestPackCacheRecord(t *testing.T) {
rd := &packCacheRecord{
offset: 10,
data: []byte{1},
}
buf := make([]byte, 1)
n, err := rd.ReadAt(buf, 10)
rtest.OK(t, err)
rtest.Equals(t, 1, n)
rtest.Equals(t, byte(1), buf[0])
_, err = rd.ReadAt(buf, 0)
assertNotOK(t, "read before loaded range", err)
_, err = rd.ReadAt(buf, 11)
assertNotOK(t, "read after loaded range", err)
_, err = rd.ReadAt(make([]byte, 2), 10)
assertNotOK(t, "read more than available data", err)
}

View File

@ -1,51 +0,0 @@
package restorer
// packHeap is a heap of packInfo references
// @see https://golang.org/pkg/container/heap/
// @see https://en.wikipedia.org/wiki/Heap_(data_structure)
type packHeap struct {
elements []*packInfo
// returns true if download of any of the files is in progress
inprogress func(files map[*fileInfo]struct{}) bool
}
func (pq *packHeap) Len() int { return len(pq.elements) }
func (pq *packHeap) Less(a, b int) bool {
packA, packB := pq.elements[a], pq.elements[b]
ap := pq.inprogress(packA.files)
bp := pq.inprogress(packB.files)
if ap && !bp {
return true
}
if packA.cost < packB.cost {
return true
}
return false
}
func (pq *packHeap) Swap(i, j int) {
pq.elements[i], pq.elements[j] = pq.elements[j], pq.elements[i]
pq.elements[i].index = i
pq.elements[j].index = j
}
func (pq *packHeap) Push(x interface{}) {
n := len(pq.elements)
item := x.(*packInfo)
item.index = n
pq.elements = append(pq.elements, item)
}
func (pq *packHeap) Pop() interface{} {
old := pq.elements
n := len(old)
item := old[n-1]
item.index = -1 // for safety
pq.elements = old[0 : n-1]
return item
}

View File

@ -1,224 +0,0 @@
package restorer
import (
"container/heap"
"github.com/restic/restic/internal/debug"
"github.com/restic/restic/internal/restic"
)
// packQueue tracks remaining file contents restore work and decides what pack
// to download and files to write next.
//
// The packs in the queue can be in one of three states: waiting, ready and
// in-progress.
// Waiting packs are the packs that only have blobs from the "middle" of their
// corresponding files and therefore cannot be used until blobs from some other
// packs are written to the files first.
// In-progress packs are the packs that were removed from the queue by #nextPack
// and must be first returned to the queue before they are considered again.
// Ready packs are the packs can be immediately used to restore at least one
// file. Internally ready packs are kept in a heap and are ordered according
// to these criteria:
// - Packs with "head" blobs of in-progress files are considered first. The
// idea is to complete restore of in-progress files before starting restore
// of other files. This is both more intuitive and also reduces number of
// open file handles needed during restore.
// - Packs with smallest cost are considered next. Pack cost is measured in
// number of other packs required before all blobs in the pack can be used
// and the pack can be removed from the pack cache.
// For example, consisder a file that requires two blobs, blob1 from pack1
// and blob2 from pack2. The cost of pack2 is 1, because blob2 cannot be
// used before blob1 is available. The higher the cost, the longer the pack
// must be cached locally to avoid redownload.
//
// Pack queue implementation is NOT thread safe. All pack queue methods must
// be called from single gorouting AND packInfo and fileInfo instances must
// be updated synchronously from the same gorouting.
type packQueue struct {
idx filePackTraverser
packs map[restic.ID]*packInfo // waiting and ready packs
inprogress map[*packInfo]struct{} // inprogress packs
heap *packHeap // heap of ready packs
}
func newPackQueue(idx filePackTraverser, files []*fileInfo, inprogress func(files map[*fileInfo]struct{}) bool) (*packQueue, error) {
packs := make(map[restic.ID]*packInfo) // all packs
// create packInfo from fileInfo
for _, file := range files {
err := idx.forEachFilePack(file, func(packIdx int, packID restic.ID, _ []restic.Blob) bool {
pack, ok := packs[packID]
if !ok {
pack = &packInfo{
id: packID,
index: -1,
files: make(map[*fileInfo]struct{}),
}
packs[packID] = pack
}
pack.files[file] = struct{}{}
pack.cost += packIdx
return true // keep going
})
if err != nil {
// repository index is messed up, can't do anything
return nil, err
}
}
// create packInfo heap
pheap := &packHeap{inprogress: inprogress}
headPacks := restic.NewIDSet()
for _, file := range files {
idx.forEachFilePack(file, func(packIdx int, packID restic.ID, _ []restic.Blob) bool {
if !headPacks.Has(packID) {
headPacks.Insert(packID)
pack := packs[packID]
pack.index = len(pheap.elements)
pheap.elements = append(pheap.elements, pack)
}
return false // only first pack
})
}
heap.Init(pheap)
return &packQueue{idx: idx, packs: packs, heap: pheap, inprogress: make(map[*packInfo]struct{})}, nil
}
// isEmpty returns true if the queue is empty, i.e. there are no more packs to
// download and files to write to.
func (h *packQueue) isEmpty() bool {
return len(h.packs) == 0 && len(h.inprogress) == 0
}
// nextPack returns next ready pack and corresponding files ready for download
// and processing. The returned pack and the files are marked as "in progress"
// internally and must be first returned to the queue before they are
// considered by #nextPack again.
func (h *packQueue) nextPack() (*packInfo, []*fileInfo) {
debug.Log("Ready packs %d, outstanding packs %d, inprogress packs %d", h.heap.Len(), len(h.packs), len(h.inprogress))
if h.heap.Len() == 0 {
return nil, nil
}
pack := heap.Pop(h.heap).(*packInfo)
h.inprogress[pack] = struct{}{}
debug.Log("Popped pack %s (%d files), heap size=%d", pack.id.Str(), len(pack.files), len(h.heap.elements))
var files []*fileInfo
for file := range pack.files {
h.idx.forEachFilePack(file, func(packIdx int, packID restic.ID, packBlobs []restic.Blob) bool {
debug.Log("Pack #%d %s (%d blobs) used by %s", packIdx, packID.Str(), len(packBlobs), file.location)
if pack.id == packID {
files = append(files, file)
}
return false // only interested in the fist pack here
})
}
return pack, files
}
// requeuePack conditionally adds back to the queue pack previously returned by
// #nextPack.
// If the pack is needed to restore any incomplete files, adds the pack to the
// queue and adjusts order of all affected packs in the queue. Has no effect
// if the pack is not required to restore any files.
// Returns true if the pack was added to the queue, false otherwise.
func (h *packQueue) requeuePack(pack *packInfo, success []*fileInfo, failure []*fileInfo) bool {
debug.Log("Requeue pack %s (%d/%d/%d files/success/failure)", pack.id.Str(), len(pack.files), len(success), len(failure))
// maintain inprogress pack set
delete(h.inprogress, pack)
affectedPacks := make(map[*packInfo]struct{})
affectedPacks[pack] = struct{}{} // this pack is alwats affected
// apply download success/failure to the packs
onFailure := func(file *fileInfo) {
h.idx.forEachFilePack(file, func(packInx int, packID restic.ID, _ []restic.Blob) bool {
pack := h.packs[packID]
delete(pack.files, file)
pack.cost -= packInx
affectedPacks[pack] = struct{}{}
return true // keep going
})
}
for _, file := range failure {
onFailure(file)
}
onSuccess := func(pack *packInfo, file *fileInfo) {
remove := true
h.idx.forEachFilePack(file, func(packIdx int, packID restic.ID, _ []restic.Blob) bool {
if packID.Equal(pack.id) {
// the pack has more blobs required by the file
remove = false
}
otherPack := h.packs[packID]
otherPack.cost--
affectedPacks[otherPack] = struct{}{}
return true // keep going
})
if remove {
delete(pack.files, file)
}
}
for _, file := range success {
onSuccess(pack, file)
}
// drop/update affected packs
isReady := func(affectedPack *packInfo) (ready bool) {
for file := range affectedPack.files {
h.idx.forEachFilePack(file, func(packIdx int, packID restic.ID, _ []restic.Blob) bool {
if packID.Equal(affectedPack.id) {
ready = true
}
return false // only file's first pack matters
})
if ready {
break
}
}
return ready
}
for affectedPack := range affectedPacks {
if _, inprogress := h.inprogress[affectedPack]; !inprogress {
if len(affectedPack.files) == 0 {
// drop the pack if it isn't inprogress and has no files that need it
if affectedPack.index >= 0 {
// This can't happen unless there is a bug elsewhere:
// - "current" pack isn't in the heap, hence its index must be < 0
// - "other" packs can't be ready (i.e. in heap) unless they have other files
// in which case len(affectedPack.files) must be > 0
debug.Log("corrupted ready heap: removed unexpected ready pack %s", affectedPack.id.Str())
heap.Remove(h.heap, affectedPack.index)
}
delete(h.packs, affectedPack.id)
} else {
ready := isReady(affectedPack)
switch {
case ready && affectedPack.index < 0:
heap.Push(h.heap, affectedPack)
case ready && affectedPack.index >= 0:
heap.Fix(h.heap, affectedPack.index)
case !ready && affectedPack.index >= 0:
// This can't happen unless there is a bug elsewhere:
// - "current" pack isn't in the heap, hence its index must be < 0
// - "other" packs can't have same head blobs as the "current" pack,
// hence "other" packs can't change their readiness
debug.Log("corrupted ready heap: removed unexpected waiting pack %s", affectedPack.id.Str())
heap.Remove(h.heap, affectedPack.index)
case !ready && affectedPack.index < 0:
// do nothing
}
}
}
}
return len(pack.files) > 0
}

View File

@ -1,236 +0,0 @@
package restorer
import (
"testing"
"github.com/restic/restic/internal/restic"
rtest "github.com/restic/restic/internal/test"
)
func processPack(t *testing.T, data *TestRepo, pack *packInfo, files []*fileInfo) {
for _, file := range files {
data.idx.forEachFilePack(file, func(packIdx int, packID restic.ID, packBlobs []restic.Blob) bool {
// assert file's head pack
rtest.Equals(t, pack.id, packID)
file.blobs = file.blobs[len(packBlobs):]
return false // only interested in the head pack
})
}
}
func TestPackQueueBasic(t *testing.T) {
data := newTestRepo([]TestFile{
TestFile{
name: "file",
blobs: []TestBlob{
TestBlob{"data1", "pack1"},
TestBlob{"data2", "pack2"},
},
},
})
queue, err := newPackQueue(data.idx, data.files, func(_ map[*fileInfo]struct{}) bool { return false })
rtest.OK(t, err)
// assert initial queue state
rtest.Equals(t, false, queue.isEmpty())
rtest.Equals(t, 0, queue.packs[data.packID("pack1")].cost)
rtest.Equals(t, 1, queue.packs[data.packID("pack2")].cost)
// get first pack
pack, files := queue.nextPack()
rtest.Equals(t, "pack1", data.packName(pack))
rtest.Equals(t, 1, len(files))
rtest.Equals(t, false, queue.isEmpty())
// TODO assert pack is inprogress
// can't process the second pack until the first one is processed
{
pack, files := queue.nextPack()
rtest.Equals(t, true, pack == nil)
rtest.Equals(t, true, files == nil)
rtest.Equals(t, false, queue.isEmpty())
}
// requeue the pack without processing
rtest.Equals(t, true, queue.requeuePack(pack, []*fileInfo{}, []*fileInfo{}))
rtest.Equals(t, false, queue.isEmpty())
rtest.Equals(t, 0, queue.packs[data.packID("pack1")].cost)
rtest.Equals(t, 1, queue.packs[data.packID("pack2")].cost)
// get the first pack again
pack, files = queue.nextPack()
rtest.Equals(t, "pack1", data.packName(pack))
rtest.Equals(t, 1, len(files))
rtest.Equals(t, false, queue.isEmpty())
// process the first pack and return it to the queue
processPack(t, data, pack, files)
rtest.Equals(t, false, queue.requeuePack(pack, files, []*fileInfo{}))
rtest.Equals(t, 0, queue.packs[data.packID("pack2")].cost)
// get the second pack
pack, files = queue.nextPack()
rtest.Equals(t, "pack2", data.packName(pack))
rtest.Equals(t, 1, len(files))
rtest.Equals(t, false, queue.isEmpty())
// process the second pack and return it to the queue
processPack(t, data, pack, files)
rtest.Equals(t, false, queue.requeuePack(pack, files, []*fileInfo{}))
// all packs processed
rtest.Equals(t, true, queue.isEmpty())
}
func TestPackQueueFailedFile(t *testing.T) {
// point of this test is to assert that enqueuePack removes
// all references to files that failed restore
data := newTestRepo([]TestFile{
TestFile{
name: "file",
blobs: []TestBlob{
TestBlob{"data1", "pack1"},
TestBlob{"data2", "pack2"},
},
},
})
queue, err := newPackQueue(data.idx, data.files, func(_ map[*fileInfo]struct{}) bool { return false })
rtest.OK(t, err)
pack, files := queue.nextPack()
rtest.Equals(t, false, queue.requeuePack(pack, []*fileInfo{}, files /*failed*/))
rtest.Equals(t, true, queue.isEmpty())
}
func TestPackQueueOrderingCost(t *testing.T) {
// assert pack1 is selected before pack2:
// pack1 is ready to restore file1, pack2 is ready to restore file2
// but pack2 cannot be immediately used to restore file1
data := newTestRepo([]TestFile{
TestFile{
name: "file1",
blobs: []TestBlob{
TestBlob{"data1", "pack1"},
TestBlob{"data2", "pack2"},
},
},
TestFile{
name: "file2",
blobs: []TestBlob{
TestBlob{"data2", "pack2"},
},
},
})
queue, err := newPackQueue(data.idx, data.files, func(_ map[*fileInfo]struct{}) bool { return false })
rtest.OK(t, err)
// assert initial pack costs
rtest.Equals(t, 0, data.pack(queue, "pack1").cost)
rtest.Equals(t, 0, data.pack(queue, "pack1").index) // head of the heap
rtest.Equals(t, 1, data.pack(queue, "pack2").cost)
rtest.Equals(t, 1, data.pack(queue, "pack2").index)
pack, files := queue.nextPack()
// assert selected pack and queue state
rtest.Equals(t, "pack1", data.packName(pack))
// process the pack
processPack(t, data, pack, files)
rtest.Equals(t, false, queue.requeuePack(pack, files, []*fileInfo{}))
}
func TestPackQueueOrderingInprogress(t *testing.T) {
// finish restoring one file before starting another
data := newTestRepo([]TestFile{
TestFile{
name: "file1",
blobs: []TestBlob{
TestBlob{"data1-1", "pack1-1"},
TestBlob{"data1-2", "pack1-2"},
},
},
TestFile{
name: "file2",
blobs: []TestBlob{
TestBlob{"data2-1", "pack2-1"},
TestBlob{"data2-2", "pack2-2"},
},
},
})
var inprogress *fileInfo
queue, err := newPackQueue(data.idx, data.files, func(files map[*fileInfo]struct{}) bool {
_, found := files[inprogress]
return found
})
rtest.OK(t, err)
// first pack of a file
pack, files := queue.nextPack()
rtest.Equals(t, 1, len(files))
file := files[0]
processPack(t, data, pack, files)
inprogress = files[0]
queue.requeuePack(pack, files, []*fileInfo{})
// second pack of the same file
pack, files = queue.nextPack()
rtest.Equals(t, 1, len(files))
rtest.Equals(t, true, file == files[0]) // same file as before
processPack(t, data, pack, files)
inprogress = nil
queue.requeuePack(pack, files, []*fileInfo{})
// first pack of the second file
pack, files = queue.nextPack()
rtest.Equals(t, 1, len(files))
rtest.Equals(t, false, file == files[0]) // different file as before
}
func TestPackQueuePackMultiuse(t *testing.T) {
// the same pack is required multiple times to restore the same file
data := newTestRepo([]TestFile{
TestFile{
name: "file",
blobs: []TestBlob{
TestBlob{"data1", "pack1"},
TestBlob{"data2", "pack2"},
TestBlob{"data3", "pack1"}, // pack1 reuse, new blob
TestBlob{"data2", "pack2"}, // pack2 reuse, same blob
},
},
})
queue, err := newPackQueue(data.idx, data.files, func(_ map[*fileInfo]struct{}) bool { return false })
rtest.OK(t, err)
pack, files := queue.nextPack()
rtest.Equals(t, "pack1", data.packName(pack))
rtest.Equals(t, 1, len(pack.files))
processPack(t, data, pack, files)
rtest.Equals(t, true, queue.requeuePack(pack, files, []*fileInfo{}))
pack, files = queue.nextPack()
rtest.Equals(t, "pack2", data.packName(pack))
rtest.Equals(t, 1, len(pack.files))
processPack(t, data, pack, files)
rtest.Equals(t, true, queue.requeuePack(pack, files, []*fileInfo{}))
pack, files = queue.nextPack()
rtest.Equals(t, "pack1", data.packName(pack))
processPack(t, data, pack, files)
rtest.Equals(t, false, queue.requeuePack(pack, files, []*fileInfo{}))
pack, files = queue.nextPack()
rtest.Equals(t, "pack2", data.packName(pack))
processPack(t, data, pack, files)
rtest.Equals(t, false, queue.requeuePack(pack, files, []*fileInfo{}))
rtest.Equals(t, true, queue.isEmpty())
}

View File

@ -206,7 +206,7 @@ func (res *Restorer) RestoreTo(ctx context.Context, dst string) error {
idx := restic.NewHardlinkIndex() idx := restic.NewHardlinkIndex()
filerestorer := newFileRestorer(dst, res.repo.Backend().Load, res.repo.Key(), filePackTraverser{lookup: res.repo.Index().Lookup}) filerestorer := newFileRestorer(dst, res.repo.Backend().Load, res.repo.Key(), res.repo.Index().Lookup)
// first tree pass: create directories and collect all files to restore // first tree pass: create directories and collect all files to restore
err = res.traverseTree(ctx, dst, string(filepath.Separator), *res.sn.Tree, treeVisitor{ err = res.traverseTree(ctx, dst, string(filepath.Separator), *res.sn.Tree, treeVisitor{
@ -249,7 +249,7 @@ func (res *Restorer) RestoreTo(ctx context.Context, dst string) error {
return err return err
} }
err = filerestorer.restoreFiles(ctx, func(location string, err error) { res.Error(location, err) }) err = filerestorer.restoreFiles(ctx)
if err != nil { if err != nil {
return err return err
} }

22
vendor/github.com/cespare/xxhash/LICENSE.txt generated vendored Normal file
View File

@ -0,0 +1,22 @@
Copyright (c) 2016 Caleb Spare
MIT License
Permission is hereby granted, free of charge, to any person obtaining
a copy of this software and associated documentation files (the
"Software"), to deal in the Software without restriction, including
without limitation the rights to use, copy, modify, merge, publish,
distribute, sublicense, and/or sell copies of the Software, and to
permit persons to whom the Software is furnished to do so, subject to
the following conditions:
The above copyright notice and this permission notice shall be
included in all copies or substantial portions of the Software.
THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND,
EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF
MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND
NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT HOLDERS BE
LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER IN AN ACTION
OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN CONNECTION
WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE.

50
vendor/github.com/cespare/xxhash/README.md generated vendored Normal file
View File

@ -0,0 +1,50 @@
# xxhash
[![GoDoc](https://godoc.org/github.com/cespare/xxhash?status.svg)](https://godoc.org/github.com/cespare/xxhash)
xxhash is a Go implementation of the 64-bit
[xxHash](http://cyan4973.github.io/xxHash/) algorithm, XXH64. This is a
high-quality hashing algorithm that is much faster than anything in the Go
standard library.
The API is very small, taking its cue from the other hashing packages in the
standard library:
$ go doc github.com/cespare/xxhash !
package xxhash // import "github.com/cespare/xxhash"
Package xxhash implements the 64-bit variant of xxHash (XXH64) as described
at http://cyan4973.github.io/xxHash/.
func New() hash.Hash64
func Sum64(b []byte) uint64
func Sum64String(s string) uint64
This implementation provides a fast pure-Go implementation and an even faster
assembly implementation for amd64.
## Benchmarks
Here are some quick benchmarks comparing the pure-Go and assembly
implementations of Sum64 against another popular Go XXH64 implementation,
[github.com/OneOfOne/xxhash](https://github.com/OneOfOne/xxhash):
| input size | OneOfOne | cespare (purego) | cespare |
| --- | --- | --- | --- |
| 5 B | 416 MB/s | 720 MB/s | 872 MB/s |
| 100 B | 3980 MB/s | 5013 MB/s | 5252 MB/s |
| 4 KB | 12727 MB/s | 12999 MB/s | 13026 MB/s |
| 10 MB | 9879 MB/s | 10775 MB/s | 10913 MB/s |
These numbers were generated with:
```
$ go test -benchtime 10s -bench '/OneOfOne,'
$ go test -tags purego -benchtime 10s -bench '/xxhash,'
$ go test -benchtime 10s -bench '/xxhash,'
```
## Projects using this package
- [InfluxDB](https://github.com/influxdata/influxdb)
- [Prometheus](https://github.com/prometheus/prometheus)

6
vendor/github.com/cespare/xxhash/go.mod generated vendored Normal file
View File

@ -0,0 +1,6 @@
module github.com/cespare/xxhash
require (
github.com/OneOfOne/xxhash v1.2.2
github.com/spaolacci/murmur3 v0.0.0-20180118202830-f09979ecbc72
)

4
vendor/github.com/cespare/xxhash/go.sum generated vendored Normal file
View File

@ -0,0 +1,4 @@
github.com/OneOfOne/xxhash v1.2.2 h1:KMrpdQIwFcEqXDklaen+P1axHaj9BSKzvpUUfnHldSE=
github.com/OneOfOne/xxhash v1.2.2/go.mod h1:HSdplMjZKSmBqAxg5vPj2TmRDmfkzw+cTzAElWljhcU=
github.com/spaolacci/murmur3 v0.0.0-20180118202830-f09979ecbc72 h1:qLC7fQah7D6K1B0ujays3HV9gkFtllcxhzImRR7ArPQ=
github.com/spaolacci/murmur3 v0.0.0-20180118202830-f09979ecbc72/go.mod h1:JwIasOWyU6f++ZhiEuf87xNszmSA2myDM2Kzu9HwQUA=

14
vendor/github.com/cespare/xxhash/rotate.go generated vendored Normal file
View File

@ -0,0 +1,14 @@
// +build !go1.9
package xxhash
// TODO(caleb): After Go 1.10 comes out, remove this fallback code.
func rol1(x uint64) uint64 { return (x << 1) | (x >> (64 - 1)) }
func rol7(x uint64) uint64 { return (x << 7) | (x >> (64 - 7)) }
func rol11(x uint64) uint64 { return (x << 11) | (x >> (64 - 11)) }
func rol12(x uint64) uint64 { return (x << 12) | (x >> (64 - 12)) }
func rol18(x uint64) uint64 { return (x << 18) | (x >> (64 - 18)) }
func rol23(x uint64) uint64 { return (x << 23) | (x >> (64 - 23)) }
func rol27(x uint64) uint64 { return (x << 27) | (x >> (64 - 27)) }
func rol31(x uint64) uint64 { return (x << 31) | (x >> (64 - 31)) }

14
vendor/github.com/cespare/xxhash/rotate19.go generated vendored Normal file
View File

@ -0,0 +1,14 @@
// +build go1.9
package xxhash
import "math/bits"
func rol1(x uint64) uint64 { return bits.RotateLeft64(x, 1) }
func rol7(x uint64) uint64 { return bits.RotateLeft64(x, 7) }
func rol11(x uint64) uint64 { return bits.RotateLeft64(x, 11) }
func rol12(x uint64) uint64 { return bits.RotateLeft64(x, 12) }
func rol18(x uint64) uint64 { return bits.RotateLeft64(x, 18) }
func rol23(x uint64) uint64 { return bits.RotateLeft64(x, 23) }
func rol27(x uint64) uint64 { return bits.RotateLeft64(x, 27) }
func rol31(x uint64) uint64 { return bits.RotateLeft64(x, 31) }

168
vendor/github.com/cespare/xxhash/xxhash.go generated vendored Normal file
View File

@ -0,0 +1,168 @@
// Package xxhash implements the 64-bit variant of xxHash (XXH64) as described
// at http://cyan4973.github.io/xxHash/.
package xxhash
import (
"encoding/binary"
"hash"
)
const (
prime1 uint64 = 11400714785074694791
prime2 uint64 = 14029467366897019727
prime3 uint64 = 1609587929392839161
prime4 uint64 = 9650029242287828579
prime5 uint64 = 2870177450012600261
)
// NOTE(caleb): I'm using both consts and vars of the primes. Using consts where
// possible in the Go code is worth a small (but measurable) performance boost
// by avoiding some MOVQs. Vars are needed for the asm and also are useful for
// convenience in the Go code in a few places where we need to intentionally
// avoid constant arithmetic (e.g., v1 := prime1 + prime2 fails because the
// result overflows a uint64).
var (
prime1v = prime1
prime2v = prime2
prime3v = prime3
prime4v = prime4
prime5v = prime5
)
type xxh struct {
v1 uint64
v2 uint64
v3 uint64
v4 uint64
total int
mem [32]byte
n int // how much of mem is used
}
// New creates a new hash.Hash64 that implements the 64-bit xxHash algorithm.
func New() hash.Hash64 {
var x xxh
x.Reset()
return &x
}
func (x *xxh) Reset() {
x.n = 0
x.total = 0
x.v1 = prime1v + prime2
x.v2 = prime2
x.v3 = 0
x.v4 = -prime1v
}
func (x *xxh) Size() int { return 8 }
func (x *xxh) BlockSize() int { return 32 }
// Write adds more data to x. It always returns len(b), nil.
func (x *xxh) Write(b []byte) (n int, err error) {
n = len(b)
x.total += len(b)
if x.n+len(b) < 32 {
// This new data doesn't even fill the current block.
copy(x.mem[x.n:], b)
x.n += len(b)
return
}
if x.n > 0 {
// Finish off the partial block.
copy(x.mem[x.n:], b)
x.v1 = round(x.v1, u64(x.mem[0:8]))
x.v2 = round(x.v2, u64(x.mem[8:16]))
x.v3 = round(x.v3, u64(x.mem[16:24]))
x.v4 = round(x.v4, u64(x.mem[24:32]))
b = b[32-x.n:]
x.n = 0
}
if len(b) >= 32 {
// One or more full blocks left.
b = writeBlocks(x, b)
}
// Store any remaining partial block.
copy(x.mem[:], b)
x.n = len(b)
return
}
func (x *xxh) Sum(b []byte) []byte {
s := x.Sum64()
return append(
b,
byte(s>>56),
byte(s>>48),
byte(s>>40),
byte(s>>32),
byte(s>>24),
byte(s>>16),
byte(s>>8),
byte(s),
)
}
func (x *xxh) Sum64() uint64 {
var h uint64
if x.total >= 32 {
v1, v2, v3, v4 := x.v1, x.v2, x.v3, x.v4
h = rol1(v1) + rol7(v2) + rol12(v3) + rol18(v4)
h = mergeRound(h, v1)
h = mergeRound(h, v2)
h = mergeRound(h, v3)
h = mergeRound(h, v4)
} else {
h = x.v3 + prime5
}
h += uint64(x.total)
i, end := 0, x.n
for ; i+8 <= end; i += 8 {
k1 := round(0, u64(x.mem[i:i+8]))
h ^= k1
h = rol27(h)*prime1 + prime4
}
if i+4 <= end {
h ^= uint64(u32(x.mem[i:i+4])) * prime1
h = rol23(h)*prime2 + prime3
i += 4
}
for i < end {
h ^= uint64(x.mem[i]) * prime5
h = rol11(h) * prime1
i++
}
h ^= h >> 33
h *= prime2
h ^= h >> 29
h *= prime3
h ^= h >> 32
return h
}
func u64(b []byte) uint64 { return binary.LittleEndian.Uint64(b) }
func u32(b []byte) uint32 { return binary.LittleEndian.Uint32(b) }
func round(acc, input uint64) uint64 {
acc += input * prime2
acc = rol31(acc)
acc *= prime1
return acc
}
func mergeRound(acc, val uint64) uint64 {
val = round(0, val)
acc ^= val
acc = acc*prime1 + prime4
return acc
}

12
vendor/github.com/cespare/xxhash/xxhash_amd64.go generated vendored Normal file
View File

@ -0,0 +1,12 @@
// +build !appengine
// +build gc
// +build !purego
package xxhash
// Sum64 computes the 64-bit xxHash digest of b.
//
//go:noescape
func Sum64(b []byte) uint64
func writeBlocks(x *xxh, b []byte) []byte

233
vendor/github.com/cespare/xxhash/xxhash_amd64.s generated vendored Normal file
View File

@ -0,0 +1,233 @@
// +build !appengine
// +build gc
// +build !purego
#include "textflag.h"
// Register allocation:
// AX h
// CX pointer to advance through b
// DX n
// BX loop end
// R8 v1, k1
// R9 v2
// R10 v3
// R11 v4
// R12 tmp
// R13 prime1v
// R14 prime2v
// R15 prime4v
// round reads from and advances the buffer pointer in CX.
// It assumes that R13 has prime1v and R14 has prime2v.
#define round(r) \
MOVQ (CX), R12 \
ADDQ $8, CX \
IMULQ R14, R12 \
ADDQ R12, r \
ROLQ $31, r \
IMULQ R13, r
// mergeRound applies a merge round on the two registers acc and val.
// It assumes that R13 has prime1v, R14 has prime2v, and R15 has prime4v.
#define mergeRound(acc, val) \
IMULQ R14, val \
ROLQ $31, val \
IMULQ R13, val \
XORQ val, acc \
IMULQ R13, acc \
ADDQ R15, acc
// func Sum64(b []byte) uint64
TEXT ·Sum64(SB), NOSPLIT, $0-32
// Load fixed primes.
MOVQ ·prime1v(SB), R13
MOVQ ·prime2v(SB), R14
MOVQ ·prime4v(SB), R15
// Load slice.
MOVQ b_base+0(FP), CX
MOVQ b_len+8(FP), DX
LEAQ (CX)(DX*1), BX
// The first loop limit will be len(b)-32.
SUBQ $32, BX
// Check whether we have at least one block.
CMPQ DX, $32
JLT noBlocks
// Set up initial state (v1, v2, v3, v4).
MOVQ R13, R8
ADDQ R14, R8
MOVQ R14, R9
XORQ R10, R10
XORQ R11, R11
SUBQ R13, R11
// Loop until CX > BX.
blockLoop:
round(R8)
round(R9)
round(R10)
round(R11)
CMPQ CX, BX
JLE blockLoop
MOVQ R8, AX
ROLQ $1, AX
MOVQ R9, R12
ROLQ $7, R12
ADDQ R12, AX
MOVQ R10, R12
ROLQ $12, R12
ADDQ R12, AX
MOVQ R11, R12
ROLQ $18, R12
ADDQ R12, AX
mergeRound(AX, R8)
mergeRound(AX, R9)
mergeRound(AX, R10)
mergeRound(AX, R11)
JMP afterBlocks
noBlocks:
MOVQ ·prime5v(SB), AX
afterBlocks:
ADDQ DX, AX
// Right now BX has len(b)-32, and we want to loop until CX > len(b)-8.
ADDQ $24, BX
CMPQ CX, BX
JG fourByte
wordLoop:
// Calculate k1.
MOVQ (CX), R8
ADDQ $8, CX
IMULQ R14, R8
ROLQ $31, R8
IMULQ R13, R8
XORQ R8, AX
ROLQ $27, AX
IMULQ R13, AX
ADDQ R15, AX
CMPQ CX, BX
JLE wordLoop
fourByte:
ADDQ $4, BX
CMPQ CX, BX
JG singles
MOVL (CX), R8
ADDQ $4, CX
IMULQ R13, R8
XORQ R8, AX
ROLQ $23, AX
IMULQ R14, AX
ADDQ ·prime3v(SB), AX
singles:
ADDQ $4, BX
CMPQ CX, BX
JGE finalize
singlesLoop:
MOVBQZX (CX), R12
ADDQ $1, CX
IMULQ ·prime5v(SB), R12
XORQ R12, AX
ROLQ $11, AX
IMULQ R13, AX
CMPQ CX, BX
JL singlesLoop
finalize:
MOVQ AX, R12
SHRQ $33, R12
XORQ R12, AX
IMULQ R14, AX
MOVQ AX, R12
SHRQ $29, R12
XORQ R12, AX
IMULQ ·prime3v(SB), AX
MOVQ AX, R12
SHRQ $32, R12
XORQ R12, AX
MOVQ AX, ret+24(FP)
RET
// writeBlocks uses the same registers as above except that it uses AX to store
// the x pointer.
// func writeBlocks(x *xxh, b []byte) []byte
TEXT ·writeBlocks(SB), NOSPLIT, $0-56
// Load fixed primes needed for round.
MOVQ ·prime1v(SB), R13
MOVQ ·prime2v(SB), R14
// Load slice.
MOVQ b_base+8(FP), CX
MOVQ CX, ret_base+32(FP) // initialize return base pointer; see NOTE below
MOVQ b_len+16(FP), DX
LEAQ (CX)(DX*1), BX
SUBQ $32, BX
// Load vN from x.
MOVQ x+0(FP), AX
MOVQ 0(AX), R8 // v1
MOVQ 8(AX), R9 // v2
MOVQ 16(AX), R10 // v3
MOVQ 24(AX), R11 // v4
// We don't need to check the loop condition here; this function is
// always called with at least one block of data to process.
blockLoop:
round(R8)
round(R9)
round(R10)
round(R11)
CMPQ CX, BX
JLE blockLoop
// Copy vN back to x.
MOVQ R8, 0(AX)
MOVQ R9, 8(AX)
MOVQ R10, 16(AX)
MOVQ R11, 24(AX)
// Construct return slice.
// NOTE: It's important that we don't construct a slice that has a base
// pointer off the end of the original slice, as in Go 1.7+ this will
// cause runtime crashes. (See discussion in, for example,
// https://github.com/golang/go/issues/16772.)
// Therefore, we calculate the length/cap first, and if they're zero, we
// keep the old base. This is what the compiler does as well if you
// write code like
// b = b[len(b):]
// New length is 32 - (CX - BX) -> BX+32 - CX.
ADDQ $32, BX
SUBQ CX, BX
JZ afterSetBase
MOVQ CX, ret_base+32(FP)
afterSetBase:
MOVQ BX, ret_len+40(FP)
MOVQ BX, ret_cap+48(FP) // set cap == len
RET

75
vendor/github.com/cespare/xxhash/xxhash_other.go generated vendored Normal file
View File

@ -0,0 +1,75 @@
// +build !amd64 appengine !gc purego
package xxhash
// Sum64 computes the 64-bit xxHash digest of b.
func Sum64(b []byte) uint64 {
// A simpler version would be
// x := New()
// x.Write(b)
// return x.Sum64()
// but this is faster, particularly for small inputs.
n := len(b)
var h uint64
if n >= 32 {
v1 := prime1v + prime2
v2 := prime2
v3 := uint64(0)
v4 := -prime1v
for len(b) >= 32 {
v1 = round(v1, u64(b[0:8:len(b)]))
v2 = round(v2, u64(b[8:16:len(b)]))
v3 = round(v3, u64(b[16:24:len(b)]))
v4 = round(v4, u64(b[24:32:len(b)]))
b = b[32:len(b):len(b)]
}
h = rol1(v1) + rol7(v2) + rol12(v3) + rol18(v4)
h = mergeRound(h, v1)
h = mergeRound(h, v2)
h = mergeRound(h, v3)
h = mergeRound(h, v4)
} else {
h = prime5
}
h += uint64(n)
i, end := 0, len(b)
for ; i+8 <= end; i += 8 {
k1 := round(0, u64(b[i:i+8:len(b)]))
h ^= k1
h = rol27(h)*prime1 + prime4
}
if i+4 <= end {
h ^= uint64(u32(b[i:i+4:len(b)])) * prime1
h = rol23(h)*prime2 + prime3
i += 4
}
for ; i < end; i++ {
h ^= uint64(b[i]) * prime5
h = rol11(h) * prime1
}
h ^= h >> 33
h *= prime2
h ^= h >> 29
h *= prime3
h ^= h >> 32
return h
}
func writeBlocks(x *xxh, b []byte) []byte {
v1, v2, v3, v4 := x.v1, x.v2, x.v3, x.v4
for len(b) >= 32 {
v1 = round(v1, u64(b[0:8:len(b)]))
v2 = round(v2, u64(b[8:16:len(b)]))
v3 = round(v3, u64(b[16:24:len(b)]))
v4 = round(v4, u64(b[24:32:len(b)]))
b = b[32:len(b):len(b)]
}
x.v1, x.v2, x.v3, x.v4 = v1, v2, v3, v4
return b
}

10
vendor/github.com/cespare/xxhash/xxhash_safe.go generated vendored Normal file
View File

@ -0,0 +1,10 @@
// +build appengine
// This file contains the safe implementations of otherwise unsafe-using code.
package xxhash
// Sum64String computes the 64-bit xxHash digest of s.
func Sum64String(s string) uint64 {
return Sum64([]byte(s))
}

30
vendor/github.com/cespare/xxhash/xxhash_unsafe.go generated vendored Normal file
View File

@ -0,0 +1,30 @@
// +build !appengine
// This file encapsulates usage of unsafe.
// xxhash_safe.go contains the safe implementations.
package xxhash
import (
"reflect"
"unsafe"
)
// Sum64String computes the 64-bit xxHash digest of s.
// It may be faster than Sum64([]byte(s)) by avoiding a copy.
//
// TODO(caleb): Consider removing this if an optimization is ever added to make
// it unnecessary: https://golang.org/issue/2205.
//
// TODO(caleb): We still have a function call; we could instead write Go/asm
// copies of Sum64 for strings to squeeze out a bit more speed.
func Sum64String(s string) uint64 {
// See https://groups.google.com/d/msg/golang-nuts/dcjzJy-bSpw/tcZYBzQqAQAJ
// for some discussion about this unsafe conversion.
var b []byte
bh := (*reflect.SliceHeader)(unsafe.Pointer(&b))
bh.Data = (*reflect.StringHeader)(unsafe.Pointer(&s)).Data
bh.Len = len(s)
bh.Cap = len(s)
return Sum64(b)
}

2
vendor/modules.txt vendored
View File

@ -20,6 +20,8 @@ github.com/Azure/go-autorest/logger
github.com/Azure/go-autorest/tracing github.com/Azure/go-autorest/tracing
# github.com/cenkalti/backoff v2.1.1+incompatible # github.com/cenkalti/backoff v2.1.1+incompatible
github.com/cenkalti/backoff github.com/cenkalti/backoff
# github.com/cespare/xxhash v1.1.0
github.com/cespare/xxhash
# github.com/cpuguy83/go-md2man v1.0.10 # github.com/cpuguy83/go-md2man v1.0.10
github.com/cpuguy83/go-md2man/md2man github.com/cpuguy83/go-md2man/md2man
# github.com/dgrijalva/jwt-go v3.2.0+incompatible # github.com/dgrijalva/jwt-go v3.2.0+incompatible