diff --git a/changelog/unreleased/issue-1605 b/changelog/unreleased/issue-1605 new file mode 100644 index 000000000..bc78a6d98 --- /dev/null +++ b/changelog/unreleased/issue-1605 @@ -0,0 +1,11 @@ +Enhancement: Concurrent restore + +This change significantly improves restore performance, especially +when using high-latency remote repositories like B2. + +The implementation now uses several concurrent threads to download and process +multiple remote files concurrently. To further reduce restore time, each remote +file is downloaded using a single repository request. + +https://github.com/restic/restic/issues/1605 +https://github.com/restic/restic/pull/1719 diff --git a/cmd/restic/cmd_restore.go b/cmd/restic/cmd_restore.go index 4bf59c06f..477192eab 100644 --- a/cmd/restic/cmd_restore.go +++ b/cmd/restic/cmd_restore.go @@ -113,8 +113,8 @@ func runRestore(opts RestoreOptions, gopts GlobalOptions, args []string) error { } totalErrors := 0 - res.Error = func(dir string, node *restic.Node, err error) error { - Warnf("ignoring error for %s: %s\n", dir, err) + res.Error = func(location string, err error) error { + Warnf("ignoring error for %s: %s\n", location, err) totalErrors++ return nil } diff --git a/go.mod b/go.mod index d2e323005..936375e55 100644 --- a/go.mod +++ b/go.mod @@ -15,6 +15,7 @@ require ( github.com/golang/protobuf v1.2.0 // indirect github.com/google/go-cmp v0.2.0 github.com/gopherjs/gopherjs v0.0.0-20180825215210-0210a2f0f73c // indirect + github.com/hashicorp/golang-lru v0.5.0 github.com/inconshreveable/mousetrap v1.0.0 // indirect github.com/jtolds/gls v4.2.1+incompatible // indirect github.com/juju/ratelimit v1.0.1 diff --git a/go.sum b/go.sum index 40d16e72c..e49fc4ca5 100644 --- a/go.sum +++ b/go.sum @@ -26,6 +26,8 @@ github.com/google/go-cmp v0.2.0 h1:+dTQ8DZQJz0Mb/HjFlkptS1FeQ4cWSnN941F8aEG4SQ= github.com/google/go-cmp v0.2.0/go.mod h1:oXzfMopK8JAjlY9xF4vHSVASa0yLyX7SntLO5aqRK0M= github.com/gopherjs/gopherjs v0.0.0-20180825215210-0210a2f0f73c h1:16eHWuMGvCjSfgRJKqIzapE78onvvTbdi1rMkU00lZw= github.com/gopherjs/gopherjs v0.0.0-20180825215210-0210a2f0f73c/go.mod h1:wJfORRmW1u3UXTncJ5qlYoELFm8eSnnEO6hX4iZ3EWY= +github.com/hashicorp/golang-lru v0.5.0 h1:CL2msUPvZTLb5O648aiLNJw3hnBxN2+1Jq8rCOH9wdo= +github.com/hashicorp/golang-lru v0.5.0/go.mod h1:/m3WP610KZHVQ1SGc6re/UDhFvYD7pJ4Ao+sR/qLZy8= github.com/inconshreveable/mousetrap v1.0.0 h1:Z8tu5sraLXCXIcARxBp/8cbvlwVa7Z1NHg9XEKhtSvM= github.com/inconshreveable/mousetrap v1.0.0/go.mod h1:PxqpIevigyE2G7u3NXJIT2ANytuPF1OarO4DADm73n8= github.com/jtolds/gls v4.2.1+incompatible h1:fSuqC+Gmlu6l/ZYAoZzx2pyucC8Xza35fpRVWLVmUEE= diff --git a/internal/restic/node.go b/internal/restic/node.go index c8b089791..638306eac 100644 --- a/internal/restic/node.go +++ b/internal/restic/node.go @@ -135,7 +135,7 @@ func (node Node) GetExtendedAttribute(a string) []byte { } // CreateAt creates the node at the given path but does NOT restore node meta data. -func (node *Node) CreateAt(ctx context.Context, path string, repo Repository, idx *HardlinkIndex) error { +func (node *Node) CreateAt(ctx context.Context, path string, repo Repository) error { debug.Log("create node %v at %v", node.Name, path) switch node.Type { @@ -144,7 +144,7 @@ func (node *Node) CreateAt(ctx context.Context, path string, repo Repository, id return err } case "file": - if err := node.createFileAt(ctx, path, repo, idx); err != nil { + if err := node.createFileAt(ctx, path, repo); err != nil { return err } case "symlink": @@ -259,18 +259,7 @@ func (node Node) createDirAt(path string) error { return nil } -func (node Node) createFileAt(ctx context.Context, path string, repo Repository, idx *HardlinkIndex) error { - if node.Links > 1 && idx.Has(node.Inode, node.DeviceID) { - if err := fs.Remove(path); !os.IsNotExist(err) { - return errors.Wrap(err, "RemoveCreateHardlink") - } - err := fs.Link(idx.GetFilename(node.Inode, node.DeviceID), path) - if err != nil { - return errors.Wrap(err, "CreateHardlink") - } - return nil - } - +func (node Node) createFileAt(ctx context.Context, path string, repo Repository) error { f, err := fs.OpenFile(path, os.O_CREATE|os.O_WRONLY|os.O_TRUNC, 0600) if err != nil { return errors.Wrap(err, "OpenFile") @@ -287,10 +276,6 @@ func (node Node) createFileAt(ctx context.Context, path string, repo Repository, return errors.Wrap(closeErr, "Close") } - if node.Links > 1 { - idx.Add(node.Inode, node.DeviceID, path) - } - return nil } diff --git a/internal/restic/node_test.go b/internal/restic/node_test.go index e262a36c6..f12353a0a 100644 --- a/internal/restic/node_test.go +++ b/internal/restic/node_test.go @@ -177,11 +177,9 @@ func TestNodeRestoreAt(t *testing.T) { } }() - idx := restic.NewHardlinkIndex() - for _, test := range nodeTests { nodePath := filepath.Join(tempdir, test.Name) - rtest.OK(t, test.CreateAt(context.TODO(), nodePath, nil, idx)) + rtest.OK(t, test.CreateAt(context.TODO(), nodePath, nil)) rtest.OK(t, test.RestoreMetadata(nodePath)) if test.Type == "symlink" && runtime.GOOS == "windows" { diff --git a/internal/restorer/doc.go b/internal/restorer/doc.go new file mode 100644 index 000000000..b3583c728 --- /dev/null +++ b/internal/restorer/doc.go @@ -0,0 +1,33 @@ +// Package restorer contains code to restore data from a repository. +// +// The Restorer tries to keep the number of backend requests minimal. It does +// this by downloading all required blobs of a pack file with a single backend +// request and avoiding repeated downloads of the same pack. In addition, +// several pack files are fetched concurrently. +// +// Here is high-level pseudo-code of the how the Restorer attempts to achieve +// these goals: +// +// while there are packs to process +// choose a pack to process [1] +// get the pack from the backend or cache [2] +// write pack blobs to the files that need them [3] +// if not all pack blobs were used +// cache the pack for future use [4] +// +// Pack download and processing (steps [2] - [4]) runs on multiple concurrent +// Goroutines. The Restorer runs all steps [2]-[4] sequentially on the same +// Goroutine. +// +// Before a pack is downloaded (step [2]), the required space is "reserved" in +// the pack cache. Actual download uses single backend request to get all +// required pack blobs. This may download blobs that are not needed, but we +// assume it'll still be faster than getting individual blobs. +// +// Target files are written (step [3]) in the "right" order, first file blob +// first, then second, then third and so on. Blob write order implies that some +// pack blobs may not be immediately used, i.e. they are "out of order" for +// their respective target files. Packs with unused blobs are cached (step +// [4]). The cache has capacity limit and may purge packs before they are fully +// used, in which case the purged packs will need to be re-downloaded. +package restorer diff --git a/internal/restorer/filepacktraverser.go b/internal/restorer/filepacktraverser.go new file mode 100644 index 000000000..bba61e0f9 --- /dev/null +++ b/internal/restorer/filepacktraverser.go @@ -0,0 +1,52 @@ +package restorer + +import ( + "github.com/restic/restic/internal/errors" + "github.com/restic/restic/internal/restic" +) + +type filePackTraverser struct { + lookup func(restic.ID, restic.BlobType) ([]restic.PackedBlob, bool) +} + +// iterates over all remaining packs of the file +func (t *filePackTraverser) forEachFilePack(file *fileInfo, fn func(packIdx int, packID restic.ID, packBlobs []restic.Blob) bool) error { + if len(file.blobs) == 0 { + return nil + } + + getBlobPack := func(blobID restic.ID) (restic.PackedBlob, error) { + packs, found := t.lookup(blobID, restic.DataBlob) + if !found { + return restic.PackedBlob{}, errors.Errorf("Unknown blob %s", blobID.String()) + } + // TODO which pack to use if multiple packs have the blob? + // MUST return the same pack for the same blob during the same execution + return packs[0], nil + } + + var prevPackID restic.ID + var prevPackBlobs []restic.Blob + packIdx := 0 + for _, blobID := range file.blobs { + packedBlob, err := getBlobPack(blobID) + if err != nil { + return err + } + if !prevPackID.IsNull() && prevPackID != packedBlob.PackID { + if !fn(packIdx, prevPackID, prevPackBlobs) { + return nil + } + packIdx++ + } + if prevPackID != packedBlob.PackID { + prevPackID = packedBlob.PackID + prevPackBlobs = make([]restic.Blob, 0) + } + prevPackBlobs = append(prevPackBlobs, packedBlob.Blob) + } + if len(prevPackBlobs) > 0 { + fn(packIdx, prevPackID, prevPackBlobs) + } + return nil +} diff --git a/internal/restorer/filerestorer.go b/internal/restorer/filerestorer.go new file mode 100644 index 000000000..4baf9b567 --- /dev/null +++ b/internal/restorer/filerestorer.go @@ -0,0 +1,324 @@ +package restorer + +import ( + "context" + "io" + "path/filepath" + + "github.com/restic/restic/internal/crypto" + "github.com/restic/restic/internal/debug" + "github.com/restic/restic/internal/errors" + "github.com/restic/restic/internal/restic" +) + +// TODO if a blob is corrupt, there may be good blob copies in other packs +// TODO evaluate if it makes sense to split download and processing workers +// pro: can (slowly) read network and decrypt/write files concurrently +// con: each worker needs to keep one pack in memory +// TODO evaluate memory footprint for larger repositories, say 10M packs/10M files +// TODO consider replacing pack file cache with blob cache +// TODO avoid decrypting the same blob multiple times +// TODO evaluate disabled debug logging overhead for large repositories + +const ( + workerCount = 8 + + // max number of open output file handles + filesWriterCount = 32 + + // estimated average pack size used to calculate pack cache capacity + averagePackSize = 5 * 1024 * 1024 + + // pack cache capacity should support at least one cached pack per worker + // allow space for extra 5 packs for actual caching + packCacheCapacity = (workerCount + 5) * averagePackSize +) + +// information about regular file being restored +type fileInfo struct { + location string // file on local filesystem relative to restorer basedir + blobs []restic.ID // remaining blobs of the file +} + +// information about a data pack required to restore one or more files +type packInfo struct { + // the pack id + id restic.ID + + // set of files that use blobs from this pack + files map[*fileInfo]struct{} + + // number of other packs that must be downloaded before all blobs in this pack can be used + cost int + + // used by packHeap + index int +} + +// fileRestorer restores set of files +type fileRestorer struct { + key *crypto.Key + idx filePackTraverser + packLoader func(ctx context.Context, h restic.Handle, length int, offset int64, fn func(rd io.Reader) error) error + + packCache *packCache // pack cache + filesWriter *filesWriter // file write + + dst string + files []*fileInfo +} + +func newFileRestorer(dst string, packLoader func(ctx context.Context, h restic.Handle, length int, offset int64, fn func(rd io.Reader) error) error, key *crypto.Key, idx filePackTraverser) *fileRestorer { + return &fileRestorer{ + packLoader: packLoader, + key: key, + idx: idx, + filesWriter: newFilesWriter(filesWriterCount), + packCache: newPackCache(packCacheCapacity), + dst: dst, + } +} + +func (r *fileRestorer) addFile(location string, content restic.IDs) { + r.files = append(r.files, &fileInfo{location: location, blobs: content}) +} + +func (r *fileRestorer) targetPath(location string) string { + return filepath.Join(r.dst, location) +} + +// used to pass information among workers (wish golang channels allowed multivalues) +type processingInfo struct { + pack *packInfo + files map[*fileInfo]error +} + +func (r *fileRestorer) restoreFiles(ctx context.Context, onError func(path string, err error)) error { + // TODO conditionally enable when debug log is on + // for _, file := range r.files { + // dbgmsg := file.location + ": " + // r.idx.forEachFilePack(file, func(packIdx int, packID restic.ID, packBlobs []restic.Blob) bool { + // if packIdx > 0 { + // dbgmsg += ", " + // } + // dbgmsg += "pack{id=" + packID.Str() + ", blobs: " + // for blobIdx, blob := range packBlobs { + // if blobIdx > 0 { + // dbgmsg += ", " + // } + // dbgmsg += blob.ID.Str() + // } + // dbgmsg += "}" + // return true // keep going + // }) + // debug.Log(dbgmsg) + // } + + inprogress := make(map[*fileInfo]struct{}) + queue, err := newPackQueue(r.idx, r.files, func(files map[*fileInfo]struct{}) bool { + for file := range files { + if _, found := inprogress[file]; found { + return true + } + } + return false + }) + if err != nil { + return err + } + + // workers + downloadCh := make(chan processingInfo) + feedbackCh := make(chan processingInfo) + + defer close(downloadCh) + defer close(feedbackCh) + + worker := func() { + for { + select { + case <-ctx.Done(): + return + case request, ok := <-downloadCh: + if !ok { + return // channel closed + } + rd, err := r.downloadPack(ctx, request.pack) + if err == nil { + r.processPack(ctx, request, rd) + } else { + // mark all files as failed + for file := range request.files { + request.files[file] = err + } + } + feedbackCh <- request + } + } + } + for i := 0; i < workerCount; i++ { + go worker() + } + + processFeedback := func(pack *packInfo, ferrors map[*fileInfo]error) { + // update files blobIdx + // must do it here to avoid race among worker and processing feedback threads + var success []*fileInfo + var failure []*fileInfo + for file, ferr := range ferrors { + target := r.targetPath(file.location) + if ferr != nil { + onError(file.location, ferr) + r.filesWriter.close(target) + delete(inprogress, file) + failure = append(failure, file) + } else { + r.idx.forEachFilePack(file, func(packIdx int, packID restic.ID, packBlobs []restic.Blob) bool { + file.blobs = file.blobs[len(packBlobs):] + return false // only interesed in the first pack + }) + if len(file.blobs) == 0 { + r.filesWriter.close(target) + delete(inprogress, file) + } + success = append(success, file) + } + } + // update the queue and requeueu the pack as necessary + if !queue.requeuePack(pack, success, failure) { + r.packCache.remove(pack.id) + debug.Log("Purged used up pack %s from pack cache", pack.id.Str()) + } + } + + // the main restore loop + for !queue.isEmpty() { + debug.Log("-----------------------------------") + pack, files := queue.nextPack() + if pack != nil { + ferrors := make(map[*fileInfo]error) + for _, file := range files { + ferrors[file] = nil + inprogress[file] = struct{}{} + } + select { + case <-ctx.Done(): + return ctx.Err() + case downloadCh <- processingInfo{pack: pack, files: ferrors}: + debug.Log("Scheduled download pack %s (%d files)", pack.id.Str(), len(files)) + case feedback := <-feedbackCh: + queue.requeuePack(pack, []*fileInfo{}, []*fileInfo{}) // didn't use the pack during this iteration + processFeedback(feedback.pack, feedback.files) + } + } else { + select { + case <-ctx.Done(): + return ctx.Err() + case feedback := <-feedbackCh: + processFeedback(feedback.pack, feedback.files) + } + } + } + + return nil +} + +func (r *fileRestorer) downloadPack(ctx context.Context, pack *packInfo) (readerAtCloser, error) { + const MaxInt64 = 1<<63 - 1 // odd Go does not have this predefined somewhere + + // calculate pack byte range + start, end := int64(MaxInt64), int64(0) + for file := range pack.files { + r.idx.forEachFilePack(file, func(packIdx int, packID restic.ID, packBlobs []restic.Blob) bool { + if packID.Equal(pack.id) { + for _, blob := range packBlobs { + if start > int64(blob.Offset) { + start = int64(blob.Offset) + } + if end < int64(blob.Offset+blob.Length) { + end = int64(blob.Offset + blob.Length) + } + } + } + + return true // keep going + }) + } + + packReader, err := r.packCache.get(pack.id, start, int(end-start), func(offset int64, length int, wr io.WriteSeeker) error { + h := restic.Handle{Type: restic.DataFile, Name: pack.id.String()} + return r.packLoader(ctx, h, length, offset, func(rd io.Reader) error { + // reset the file in case of a download retry + _, err := wr.Seek(0, io.SeekStart) + if err != nil { + return err + } + + len, err := io.Copy(wr, rd) + if err != nil { + return err + } + if len != int64(length) { + return errors.Errorf("unexpected pack size: expected %d but got %d", length, len) + } + + return nil + }) + }) + if err != nil { + return nil, err + } + + return packReader, nil +} + +func (r *fileRestorer) processPack(ctx context.Context, request processingInfo, rd readerAtCloser) { + defer rd.Close() + + for file := range request.files { + target := r.targetPath(file.location) + r.idx.forEachFilePack(file, func(packIdx int, packID restic.ID, packBlobs []restic.Blob) bool { + for _, blob := range packBlobs { + debug.Log("Writing blob %s (%d bytes) from pack %s to %s", blob.ID.Str(), blob.Length, packID.Str(), file.location) + buf, err := r.loadBlob(rd, blob) + if err == nil { + err = r.filesWriter.writeToFile(target, buf) + } + if err != nil { + request.files[file] = err + break // could not restore the file + } + } + return false + }) + } +} + +func (r *fileRestorer) loadBlob(rd io.ReaderAt, blob restic.Blob) ([]byte, error) { + // TODO reconcile with Repository#loadBlob implementation + + buf := make([]byte, blob.Length) + + n, err := rd.ReadAt(buf, int64(blob.Offset)) + if err != nil { + return nil, err + } + + if n != int(blob.Length) { + return nil, errors.Errorf("error loading blob %v: wrong length returned, want %d, got %d", blob.ID.Str(), blob.Length, n) + } + + // decrypt + nonce, ciphertext := buf[:r.key.NonceSize()], buf[r.key.NonceSize():] + plaintext, err := r.key.Open(ciphertext[:0], nonce, ciphertext, nil) + if err != nil { + return nil, errors.Errorf("decrypting blob %v failed: %v", blob.ID, err) + } + + // check hash + if !restic.Hash(plaintext).Equal(blob.ID) { + return nil, errors.Errorf("blob %v returned invalid hash", blob.ID) + } + + return plaintext, nil +} diff --git a/internal/restorer/filerestorer_test.go b/internal/restorer/filerestorer_test.go new file mode 100644 index 000000000..dd022e9d4 --- /dev/null +++ b/internal/restorer/filerestorer_test.go @@ -0,0 +1,212 @@ +package restorer + +import ( + "bytes" + "context" + "io" + "io/ioutil" + "testing" + + "github.com/restic/restic/internal/crypto" + "github.com/restic/restic/internal/errors" + "github.com/restic/restic/internal/restic" + rtest "github.com/restic/restic/internal/test" +) + +type TestBlob struct { + data string + pack string +} + +type TestFile struct { + name string + blobs []TestBlob +} + +type TestRepo struct { + key *crypto.Key + + // pack names and ids + packsNameToID map[string]restic.ID + packsIDToName map[restic.ID]string + packsIDToData map[restic.ID][]byte + + // blobs and files + blobs map[restic.ID][]restic.PackedBlob + files []*fileInfo + filesPathToContent map[string]string + + // + loader func(ctx context.Context, h restic.Handle, length int, offset int64, fn func(rd io.Reader) error) error + + // + idx filePackTraverser +} + +func (i *TestRepo) Lookup(blobID restic.ID, _ restic.BlobType) ([]restic.PackedBlob, bool) { + packs, found := i.blobs[blobID] + return packs, found +} + +func (i *TestRepo) packName(pack *packInfo) string { + return i.packsIDToName[pack.id] +} + +func (i *TestRepo) packID(name string) restic.ID { + return i.packsNameToID[name] +} + +func (i *TestRepo) pack(queue *packQueue, name string) *packInfo { + id := i.packsNameToID[name] + return queue.packs[id] +} + +func (i *TestRepo) fileContent(file *fileInfo) string { + return i.filesPathToContent[file.location] +} + +func newTestRepo(content []TestFile) *TestRepo { + type Pack struct { + name string + data []byte + blobs map[restic.ID]restic.Blob + } + packs := make(map[string]Pack) + + key := crypto.NewRandomKey() + seal := func(data []byte) []byte { + ciphertext := restic.NewBlobBuffer(len(data)) + ciphertext = ciphertext[:0] // truncate the slice + nonce := crypto.NewRandomNonce() + ciphertext = append(ciphertext, nonce...) + return key.Seal(ciphertext, nonce, data, nil) + } + + filesPathToContent := make(map[string]string) + + for _, file := range content { + var content string + for _, blob := range file.blobs { + content += blob.data + + // get the pack, create as necessary + var pack Pack + var found bool + if pack, found = packs[blob.pack]; !found { + pack = Pack{name: blob.pack, blobs: make(map[restic.ID]restic.Blob)} + } + + // calculate blob id and add to the pack as necessary + blobID := restic.Hash([]byte(blob.data)) + if _, found := pack.blobs[blobID]; !found { + blobData := seal([]byte(blob.data)) + pack.blobs[blobID] = restic.Blob{ + Type: restic.DataBlob, + ID: blobID, + Length: uint(len(blobData)), + Offset: uint(len(pack.data)), + } + pack.data = append(pack.data, blobData...) + } + + packs[blob.pack] = pack + } + filesPathToContent[file.name] = content + } + + blobs := make(map[restic.ID][]restic.PackedBlob) + packsIDToName := make(map[restic.ID]string) + packsIDToData := make(map[restic.ID][]byte) + packsNameToID := make(map[string]restic.ID) + + for _, pack := range packs { + packID := restic.Hash(pack.data) + packsIDToName[packID] = pack.name + packsIDToData[packID] = pack.data + packsNameToID[pack.name] = packID + for blobID, blob := range pack.blobs { + blobs[blobID] = append(blobs[blobID], restic.PackedBlob{Blob: blob, PackID: packID}) + } + } + + var files []*fileInfo + for _, file := range content { + content := restic.IDs{} + for _, blob := range file.blobs { + content = append(content, restic.Hash([]byte(blob.data))) + } + files = append(files, &fileInfo{location: file.name, blobs: content}) + } + + repo := &TestRepo{ + key: key, + packsIDToName: packsIDToName, + packsIDToData: packsIDToData, + packsNameToID: packsNameToID, + blobs: blobs, + files: files, + filesPathToContent: filesPathToContent, + } + repo.idx = filePackTraverser{lookup: repo.Lookup} + repo.loader = func(ctx context.Context, h restic.Handle, length int, offset int64, fn func(rd io.Reader) error) error { + packID, err := restic.ParseID(h.Name) + if err != nil { + return err + } + rd := bytes.NewReader(repo.packsIDToData[packID][int(offset) : int(offset)+length]) + return fn(rd) + } + + return repo +} + +func restoreAndVerify(t *testing.T, tempdir string, content []TestFile) { + repo := newTestRepo(content) + + r := newFileRestorer(tempdir, repo.loader, repo.key, repo.idx) + r.files = repo.files + + r.restoreFiles(context.TODO(), func(path string, err error) { + rtest.OK(t, errors.Wrapf(err, "unexpected error")) + }) + + for _, file := range repo.files { + target := r.targetPath(file.location) + data, err := ioutil.ReadFile(target) + if err != nil { + t.Errorf("unable to read file %v: %v", file.location, err) + continue + } + + rtest.Equals(t, false, r.filesWriter.writers.Contains(target)) + + content := repo.fileContent(file) + if !bytes.Equal(data, []byte(content)) { + t.Errorf("file %v has wrong content: want %q, got %q", file.location, content, data) + } + } + + rtest.OK(t, nil) +} + +func TestFileRestorerBasic(t *testing.T) { + tempdir, cleanup := rtest.TempDir(t) + defer cleanup() + + restoreAndVerify(t, tempdir, []TestFile{ + TestFile{ + name: "file1", + blobs: []TestBlob{ + TestBlob{"data1-1", "pack1-1"}, + TestBlob{"data1-2", "pack1-2"}, + }, + }, + TestFile{ + name: "file2", + blobs: []TestBlob{ + TestBlob{"data2-1", "pack2-1"}, + TestBlob{"data2-2", "pack2-2"}, + }, + }, + }) +} diff --git a/internal/restorer/fileswriter.go b/internal/restorer/fileswriter.go new file mode 100644 index 000000000..af7ea8428 --- /dev/null +++ b/internal/restorer/fileswriter.go @@ -0,0 +1,70 @@ +package restorer + +import ( + "io" + "os" + "sync" + + "github.com/hashicorp/golang-lru/simplelru" + "github.com/restic/restic/internal/debug" + "github.com/restic/restic/internal/errors" +) + +type filesWriter struct { + lock sync.Mutex // guards concurrent access + inprogress map[string]struct{} // (logically) opened file writers + writers simplelru.LRUCache // key: string, value: *os.File +} + +func newFilesWriter(count int) *filesWriter { + writers, _ := simplelru.NewLRU(count, func(key interface{}, value interface{}) { + value.(*os.File).Close() + debug.Log("Closed and purged cached writer for %v", key) + }) + return &filesWriter{inprogress: make(map[string]struct{}), writers: writers} +} + +func (w *filesWriter) writeToFile(path string, buf []byte) error { + acquireWriter := func() (io.Writer, error) { + w.lock.Lock() + defer w.lock.Unlock() + if wr, ok := w.writers.Get(path); ok { + debug.Log("Used cached writer for %s", path) + return wr.(*os.File), nil + } + var flags int + if _, append := w.inprogress[path]; append { + flags = os.O_APPEND | os.O_WRONLY + } else { + w.inprogress[path] = struct{}{} + flags = os.O_CREATE | os.O_TRUNC | os.O_WRONLY + } + wr, err := os.OpenFile(path, flags, 0600) + if err != nil { + return nil, err + } + w.writers.Add(path, wr) + debug.Log("Opened and cached writer for %s", path) + return wr, nil + } + + wr, err := acquireWriter() + if err != nil { + return err + } + n, err := wr.Write(buf) + if err != nil { + return err + } + if n != len(buf) { + return errors.Errorf("error writing file %v: wrong length written, want %d, got %d", path, len(buf), n) + } + return nil +} + +func (w *filesWriter) close(path string) { + w.lock.Lock() + defer w.lock.Unlock() + w.writers.Remove(path) + delete(w.inprogress, path) +} diff --git a/internal/restorer/fileswriter_test.go b/internal/restorer/fileswriter_test.go new file mode 100644 index 000000000..45c2a88fb --- /dev/null +++ b/internal/restorer/fileswriter_test.go @@ -0,0 +1,44 @@ +package restorer + +import ( + "io/ioutil" + "testing" + + rtest "github.com/restic/restic/internal/test" +) + +func TestFilesWriterBasic(t *testing.T) { + dir, cleanup := rtest.TempDir(t) + defer cleanup() + + w := newFilesWriter(1) + + f1 := dir + "/f1" + f2 := dir + "/f2" + + rtest.OK(t, w.writeToFile(f1, []byte{1})) + rtest.Equals(t, 1, w.writers.Len()) + rtest.Equals(t, 1, len(w.inprogress)) + + rtest.OK(t, w.writeToFile(f2, []byte{2})) + rtest.Equals(t, 1, w.writers.Len()) + rtest.Equals(t, 2, len(w.inprogress)) + + rtest.OK(t, w.writeToFile(f1, []byte{1})) + w.close(f1) + rtest.Equals(t, 0, w.writers.Len()) + rtest.Equals(t, 1, len(w.inprogress)) + + rtest.OK(t, w.writeToFile(f2, []byte{2})) + w.close(f2) + rtest.Equals(t, 0, w.writers.Len()) + rtest.Equals(t, 0, len(w.inprogress)) + + buf, err := ioutil.ReadFile(f1) + rtest.OK(t, err) + rtest.Equals(t, []byte{1, 1}, buf) + + buf, err = ioutil.ReadFile(f2) + rtest.OK(t, err) + rtest.Equals(t, []byte{2, 2}, buf) +} diff --git a/internal/restorer/packcache.go b/internal/restorer/packcache.go new file mode 100644 index 000000000..1eaad63bf --- /dev/null +++ b/internal/restorer/packcache.go @@ -0,0 +1,243 @@ +package restorer + +import ( + "io" + "sync" + + "github.com/restic/restic/internal/debug" + "github.com/restic/restic/internal/errors" + "github.com/restic/restic/internal/restic" +) + +// packCache is thread safe in-memory cache of pack files required to restore +// one or more files. The cache is meant to hold pack files that cannot be +// fully used right away. This happens when pack files contains blobs from +// "head" of some files and "middle" of other files. "Middle" blobs cannot be +// written to their files until after blobs from some other packs are written +// to the files first. +// +// While the cache is thread safe, implementation assumes (and enforces) +// that individual entries are used by one client at a time. Clients must +// #Close() entry's reader to make the entry available for use by other +// clients. This limitation can be relaxed in the future if necessary. +type packCache struct { + // guards access to cache internal data structures + lock sync.Mutex + + // cache capacity + capacity int + reservedCapacity int + allocatedCapacity int + + // pack records currently being used by active restore worker + reservedPacks map[restic.ID]*packCacheRecord + + // unused allocated packs, can be deleted if necessary + cachedPacks map[restic.ID]*packCacheRecord +} + +type packCacheRecord struct { + master *packCacheRecord + cache *packCache + + id restic.ID // cached pack id + offset int64 // cached pack byte range + + data []byte +} + +type readerAtCloser interface { + io.Closer + io.ReaderAt +} + +type bytesWriteSeeker struct { + pos int + data []byte +} + +func (wr *bytesWriteSeeker) Write(p []byte) (n int, err error) { + if wr.pos+len(p) > len(wr.data) { + return -1, errors.Errorf("not enough space") + } + n = copy(wr.data[wr.pos:], p) + wr.pos += n + return n, nil +} + +func (wr *bytesWriteSeeker) Seek(offset int64, whence int) (int64, error) { + if offset != 0 || whence != io.SeekStart { + return -1, errors.Errorf("unsupported seek request") + } + wr.pos = 0 + return 0, nil +} + +func newPackCache(capacity int) *packCache { + return &packCache{ + capacity: capacity, + reservedPacks: make(map[restic.ID]*packCacheRecord), + cachedPacks: make(map[restic.ID]*packCacheRecord), + } +} + +func (c *packCache) reserve(packID restic.ID, offset int64, length int) (record *packCacheRecord, err error) { + c.lock.Lock() + defer c.lock.Unlock() + + if offset < 0 || length <= 0 { + return nil, errors.Errorf("illegal pack cache allocation range %s {offset: %d, length: %d}", packID.Str(), offset, length) + } + + if c.reservedCapacity+length > c.capacity { + return nil, errors.Errorf("not enough cache capacity: requested %d, available %d", length, c.capacity-c.reservedCapacity) + } + + if _, ok := c.reservedPacks[packID]; ok { + return nil, errors.Errorf("pack is already reserved %s", packID.Str()) + } + + // the pack is available in the cache and currently unused + if pack, ok := c.cachedPacks[packID]; ok { + // check if cached pack includes requested byte range + // the range can shrink, but it never grows bigger unless there is a bug elsewhere + if pack.offset > offset || (pack.offset+int64(len(pack.data))) < (offset+int64(length)) { + return nil, errors.Errorf("cached range %d-%d is smaller than requested range %d-%d for pack %s", pack.offset, pack.offset+int64(len(pack.data)), length, offset+int64(length), packID.Str()) + } + + // move the pack to the used map + delete(c.cachedPacks, packID) + c.reservedPacks[packID] = pack + c.reservedCapacity += len(pack.data) + + debug.Log("Using cached pack %s (%d bytes)", pack.id.Str(), len(pack.data)) + + if pack.offset != offset || len(pack.data) != length { + // restrict returned record to requested range + return &packCacheRecord{ + cache: c, + master: pack, + offset: offset, + data: pack.data[int(offset-pack.offset) : int(offset-pack.offset)+length], + }, nil + } + + return pack, nil + } + + for c.allocatedCapacity+length > c.capacity { + // all cached packs will be needed at some point + // so it does not matter which one to purge + for _, cached := range c.cachedPacks { + delete(c.cachedPacks, cached.id) + c.allocatedCapacity -= len(cached.data) + debug.Log("dropped cached pack %s (%d bytes)", cached.id.Str(), len(cached.data)) + break + } + } + + pack := &packCacheRecord{ + cache: c, + id: packID, + offset: offset, + } + c.reservedPacks[pack.id] = pack + c.allocatedCapacity += length + c.reservedCapacity += length + + return pack, nil +} + +// get returns reader of the specified cached pack. Uses provided load func +// to download pack content if necessary. +// The returned reader is only able to read pack within byte range specified +// by offset and length parameters, attempts to read outside that range will +// result in an error. +// The returned reader must be closed before the same packID can be requested +// from the cache again. +func (c *packCache) get(packID restic.ID, offset int64, length int, load func(offset int64, length int, wr io.WriteSeeker) error) (readerAtCloser, error) { + pack, err := c.reserve(packID, offset, length) + if err != nil { + return nil, err + } + + if pack.data == nil { + releasePack := func() { + delete(c.reservedPacks, pack.id) + c.reservedCapacity -= length + c.allocatedCapacity -= length + } + wr := &bytesWriteSeeker{data: make([]byte, length)} + err = load(offset, length, wr) + if err != nil { + releasePack() + return nil, err + } + if wr.pos != length { + releasePack() + return nil, errors.Errorf("invalid read size") + } + pack.data = wr.data + debug.Log("Downloaded and cached pack %s (%d bytes)", pack.id.Str(), len(pack.data)) + } + + return pack, nil +} + +// releases the pack record back to the cache +func (c *packCache) release(pack *packCacheRecord) error { + c.lock.Lock() + defer c.lock.Unlock() + + if _, ok := c.reservedPacks[pack.id]; !ok { + return errors.Errorf("invalid pack release request") + } + + delete(c.reservedPacks, pack.id) + c.cachedPacks[pack.id] = pack + c.reservedCapacity -= len(pack.data) + + return nil +} + +// remove removes specified pack from the cache and frees +// corresponding cache space. should be called after the pack +// was fully used up by the restorer. +func (c *packCache) remove(packID restic.ID) error { + c.lock.Lock() + defer c.lock.Unlock() + + if _, ok := c.reservedPacks[packID]; ok { + return errors.Errorf("invalid pack remove request, pack %s is reserved", packID.Str()) + } + + pack, ok := c.cachedPacks[packID] + if !ok { + return errors.Errorf("invalid pack remove request, pack %s is not cached", packID.Str()) + } + + delete(c.cachedPacks, pack.id) + c.allocatedCapacity -= len(pack.data) + + return nil +} + +// ReadAt reads len(b) bytes from the pack starting at byte offset off. +// It returns the number of bytes read and the error, if any. +func (r *packCacheRecord) ReadAt(b []byte, off int64) (n int, err error) { + if off < r.offset || off+int64(len(b)) > r.offset+int64(len(r.data)) { + return -1, errors.Errorf("read outside available range") + } + return copy(b, r.data[off-r.offset:]), nil +} + +// Close closes the pack reader and releases corresponding cache record +// to the cache. Once closed, the record can be reused by subsequent +// requests for the same packID or it can be purged from the cache to make +// room for other packs +func (r *packCacheRecord) Close() (err error) { + if r.master != nil { + return r.cache.release(r.master) + } + return r.cache.release(r) +} diff --git a/internal/restorer/packcache_test.go b/internal/restorer/packcache_test.go new file mode 100644 index 000000000..3a5f18cf5 --- /dev/null +++ b/internal/restorer/packcache_test.go @@ -0,0 +1,305 @@ +package restorer + +import ( + "io" + "testing" + + "github.com/restic/restic/internal/errors" + "github.com/restic/restic/internal/restic" + rtest "github.com/restic/restic/internal/test" +) + +func assertNotOK(t *testing.T, msg string, err error) { + rtest.Assert(t, err != nil, msg+" did not fail") +} + +func TestBytesWriterSeeker(t *testing.T) { + wr := &bytesWriteSeeker{data: make([]byte, 10)} + + n, err := wr.Write([]byte{1, 2}) + rtest.OK(t, err) + rtest.Equals(t, 2, n) + rtest.Equals(t, []byte{1, 2}, wr.data[0:2]) + + n64, err := wr.Seek(0, io.SeekStart) + rtest.OK(t, err) + rtest.Equals(t, int64(0), n64) + + n, err = wr.Write([]byte{0, 1, 2, 3, 4}) + rtest.OK(t, err) + rtest.Equals(t, 5, n) + n, err = wr.Write([]byte{5, 6, 7, 8, 9}) + rtest.OK(t, err) + rtest.Equals(t, 5, n) + rtest.Equals(t, []byte{0, 1, 2, 3, 4, 5, 6, 7, 8, 9}, wr.data) + + // negative tests + _, err = wr.Write([]byte{1}) + assertNotOK(t, "write overflow", err) + _, err = wr.Seek(1, io.SeekStart) + assertNotOK(t, "unsupported seek", err) +} + +func TestPackCacheBasic(t *testing.T) { + assertReader := func(expected []byte, offset int64, rd io.ReaderAt) { + actual := make([]byte, len(expected)) + rd.ReadAt(actual, offset) + rtest.Equals(t, expected, actual) + } + + c := newPackCache(10) + + id := restic.NewRandomID() + + // load pack to the cache + rd, err := c.get(id, 10, 5, func(offset int64, length int, wr io.WriteSeeker) error { + rtest.Equals(t, int64(10), offset) + rtest.Equals(t, 5, length) + wr.Write([]byte{1, 2, 3, 4, 5}) + return nil + }) + rtest.OK(t, err) + assertReader([]byte{1, 2, 3, 4, 5}, 10, rd) + + // must close pack reader before can request it again + _, err = c.get(id, 10, 5, func(offset int64, length int, wr io.WriteSeeker) error { + t.Error("unexpected cache load call") + return nil + }) + assertNotOK(t, "double-reservation", err) + + // close the pack reader and get it from cache + rd.Close() + rd, err = c.get(id, 10, 5, func(offset int64, length int, wr io.WriteSeeker) error { + t.Error("unexpected cache load call") + return nil + }) + rtest.OK(t, err) + assertReader([]byte{1, 2, 3, 4, 5}, 10, rd) + + // close the pack reader and remove the pack from cache, assert the pack is loaded on request + rd.Close() + c.remove(id) + rd, err = c.get(id, 10, 5, func(offset int64, length int, wr io.WriteSeeker) error { + rtest.Equals(t, int64(10), offset) + rtest.Equals(t, 5, length) + wr.Write([]byte{1, 2, 3, 4, 5}) + return nil + }) + rtest.OK(t, err) + assertReader([]byte{1, 2, 3, 4, 5}, 10, rd) +} + +func TestPackCacheInvalidRange(t *testing.T) { + c := newPackCache(10) + + id := restic.NewRandomID() + + _, err := c.get(id, -1, 1, func(offset int64, length int, wr io.WriteSeeker) error { + t.Error("unexpected cache load call") + return nil + }) + assertNotOK(t, "negative offset request", err) + + _, err = c.get(id, 0, 0, func(offset int64, length int, wr io.WriteSeeker) error { + t.Error("unexpected cache load call") + return nil + }) + assertNotOK(t, "zero length request", err) + + _, err = c.get(id, 0, -1, func(offset int64, length int, wr io.WriteSeeker) error { + t.Error("unexpected cache load call") + return nil + }) + assertNotOK(t, "negative length", err) +} + +func TestPackCacheCapacity(t *testing.T) { + c := newPackCache(10) + + id1, id2, id3 := restic.NewRandomID(), restic.NewRandomID(), restic.NewRandomID() + + // load and reserve pack1 + rd1, err := c.get(id1, 0, 5, func(offset int64, length int, wr io.WriteSeeker) error { + wr.Write([]byte{1, 2, 3, 4, 5}) + return nil + }) + rtest.OK(t, err) + + // load and reserve pack2 + _, err = c.get(id2, 0, 5, func(offset int64, length int, wr io.WriteSeeker) error { + wr.Write([]byte{1, 2, 3, 4, 5}) + return nil + }) + rtest.OK(t, err) + + // can't load pack3 because not enough space in the cache + _, err = c.get(id3, 0, 5, func(offset int64, length int, wr io.WriteSeeker) error { + t.Error("unexpected cache load call") + return nil + }) + assertNotOK(t, "request over capacity", err) + + // release pack1 and try again + rd1.Close() + rd3, err := c.get(id3, 0, 5, func(offset int64, length int, wr io.WriteSeeker) error { + wr.Write([]byte{1, 2, 3, 4, 5}) + return nil + }) + rtest.OK(t, err) + + // release pack3 and load pack1 (should not come from cache) + rd3.Close() + loaded := false + rd1, err = c.get(id1, 0, 5, func(offset int64, length int, wr io.WriteSeeker) error { + wr.Write([]byte{1, 2, 3, 4, 5}) + loaded = true + return nil + }) + rtest.OK(t, err) + rtest.Equals(t, true, loaded) +} + +func TestPackCacheDownsizeRecord(t *testing.T) { + c := newPackCache(10) + + id := restic.NewRandomID() + + // get bigger range first + rd, err := c.get(id, 5, 5, func(offset int64, length int, wr io.WriteSeeker) error { + wr.Write([]byte{1, 2, 3, 4, 5}) + return nil + }) + rtest.OK(t, err) + rd.Close() + + // invalid "resize" requests + _, err = c.get(id, 5, 10, func(offset int64, length int, wr io.WriteSeeker) error { + t.Error("unexpected pack load") + return nil + }) + assertNotOK(t, "resize cached record", err) + + // invalid before cached range request + _, err = c.get(id, 0, 5, func(offset int64, length int, wr io.WriteSeeker) error { + t.Error("unexpected pack load") + return nil + }) + assertNotOK(t, "before cached range request", err) + + // invalid after cached range request + _, err = c.get(id, 10, 5, func(offset int64, length int, wr io.WriteSeeker) error { + t.Error("unexpected pack load") + return nil + }) + assertNotOK(t, "after cached range request", err) + + // now get smaller "nested" range + rd, err = c.get(id, 7, 1, func(offset int64, length int, wr io.WriteSeeker) error { + t.Error("unexpected pack load") + return nil + }) + rtest.OK(t, err) + + // assert expected data + buf := make([]byte, 1) + rd.ReadAt(buf, 7) + rtest.Equals(t, byte(3), buf[0]) + _, err = rd.ReadAt(buf, 0) + assertNotOK(t, "read before downsized pack range", err) + _, err = rd.ReadAt(buf, 9) + assertNotOK(t, "read after downsized pack range", err) + + // can't request downsized record again + _, err = c.get(id, 7, 1, func(offset int64, length int, wr io.WriteSeeker) error { + t.Error("unexpected pack load") + return nil + }) + assertNotOK(t, "double-allocation of cache record subrange", err) + + // can't request another subrange of the original record + _, err = c.get(id, 6, 1, func(offset int64, length int, wr io.WriteSeeker) error { + t.Error("unexpected pack load") + return nil + }) + assertNotOK(t, "allocation of another subrange of cache record", err) + + // release downsized record and assert the original is back in the cache + rd.Close() + rd, err = c.get(id, 5, 5, func(offset int64, length int, wr io.WriteSeeker) error { + t.Error("unexpected pack load") + return nil + }) + rtest.OK(t, err) + rd.Close() +} + +func TestPackCacheFailedDownload(t *testing.T) { + c := newPackCache(10) + assertEmpty := func() { + rtest.Equals(t, 0, len(c.cachedPacks)) + rtest.Equals(t, 10, c.capacity) + rtest.Equals(t, 0, c.reservedCapacity) + rtest.Equals(t, 0, c.allocatedCapacity) + } + + _, err := c.get(restic.NewRandomID(), 0, 5, func(offset int64, length int, wr io.WriteSeeker) error { + return errors.Errorf("expected induced test error") + }) + assertNotOK(t, "not enough bytes read", err) + assertEmpty() + + _, err = c.get(restic.NewRandomID(), 0, 5, func(offset int64, length int, wr io.WriteSeeker) error { + wr.Write([]byte{1}) + return nil + }) + assertNotOK(t, "not enough bytes read", err) + assertEmpty() + + _, err = c.get(restic.NewRandomID(), 0, 5, func(offset int64, length int, wr io.WriteSeeker) error { + wr.Write([]byte{1, 2, 3, 4, 5, 6}) + return nil + }) + assertNotOK(t, "too many bytes read", err) + assertEmpty() +} + +func TestPackCacheInvalidRequests(t *testing.T) { + c := newPackCache(10) + + id := restic.NewRandomID() + + // + rd, _ := c.get(id, 0, 1, func(offset int64, length int, wr io.WriteSeeker) error { + wr.Write([]byte{1}) + return nil + }) + assertNotOK(t, "remove() reserved pack", c.remove(id)) + rtest.OK(t, rd.Close()) + assertNotOK(t, "multiple reader Close() calls)", rd.Close()) + + // + rtest.OK(t, c.remove(id)) + assertNotOK(t, "double remove() the same pack", c.remove(id)) +} + +func TestPackCacheRecord(t *testing.T) { + rd := &packCacheRecord{ + offset: 10, + data: []byte{1}, + } + buf := make([]byte, 1) + n, err := rd.ReadAt(buf, 10) + rtest.OK(t, err) + rtest.Equals(t, 1, n) + rtest.Equals(t, byte(1), buf[0]) + + _, err = rd.ReadAt(buf, 0) + assertNotOK(t, "read before loaded range", err) + + _, err = rd.ReadAt(buf, 11) + assertNotOK(t, "read after loaded range", err) + + _, err = rd.ReadAt(make([]byte, 2), 10) + assertNotOK(t, "read more than available data", err) +} diff --git a/internal/restorer/packheap.go b/internal/restorer/packheap.go new file mode 100644 index 000000000..9f8443d46 --- /dev/null +++ b/internal/restorer/packheap.go @@ -0,0 +1,51 @@ +package restorer + +// packHeap is a heap of packInfo references +// @see https://golang.org/pkg/container/heap/ +// @see https://en.wikipedia.org/wiki/Heap_(data_structure) +type packHeap struct { + elements []*packInfo + + // returns true if download of any of the files is in progress + inprogress func(files map[*fileInfo]struct{}) bool +} + +func (pq *packHeap) Len() int { return len(pq.elements) } + +func (pq *packHeap) Less(a, b int) bool { + packA, packB := pq.elements[a], pq.elements[b] + + ap := pq.inprogress(packA.files) + bp := pq.inprogress(packB.files) + if ap && !bp { + return true + } + + if packA.cost < packB.cost { + return true + } + + return false +} + +func (pq *packHeap) Swap(i, j int) { + pq.elements[i], pq.elements[j] = pq.elements[j], pq.elements[i] + pq.elements[i].index = i + pq.elements[j].index = j +} + +func (pq *packHeap) Push(x interface{}) { + n := len(pq.elements) + item := x.(*packInfo) + item.index = n + pq.elements = append(pq.elements, item) +} + +func (pq *packHeap) Pop() interface{} { + old := pq.elements + n := len(old) + item := old[n-1] + item.index = -1 // for safety + pq.elements = old[0 : n-1] + return item +} diff --git a/internal/restorer/packqueue.go b/internal/restorer/packqueue.go new file mode 100644 index 000000000..fe8259846 --- /dev/null +++ b/internal/restorer/packqueue.go @@ -0,0 +1,224 @@ +package restorer + +import ( + "container/heap" + + "github.com/restic/restic/internal/debug" + "github.com/restic/restic/internal/restic" +) + +// packQueue tracks remaining file contents restore work and decides what pack +// to download and files to write next. +// +// The packs in the queue can be in one of three states: waiting, ready and +// in-progress. +// Waiting packs are the packs that only have blobs from the "middle" of their +// corresponding files and therefore cannot be used until blobs from some other +// packs are written to the files first. +// In-progress packs are the packs that were removed from the queue by #nextPack +// and must be first returned to the queue before they are considered again. +// Ready packs are the packs can be immediately used to restore at least one +// file. Internally ready packs are kept in a heap and are ordered according +// to these criteria: +// - Packs with "head" blobs of in-progress files are considered first. The +// idea is to complete restore of in-progress files before starting restore +// of other files. This is both more intuitive and also reduces number of +// open file handles needed during restore. +// - Packs with smallest cost are considered next. Pack cost is measured in +// number of other packs required before all blobs in the pack can be used +// and the pack can be removed from the pack cache. +// For example, consisder a file that requires two blobs, blob1 from pack1 +// and blob2 from pack2. The cost of pack2 is 1, because blob2 cannot be +// used before blob1 is available. The higher the cost, the longer the pack +// must be cached locally to avoid redownload. +// +// Pack queue implementation is NOT thread safe. All pack queue methods must +// be called from single gorouting AND packInfo and fileInfo instances must +// be updated synchronously from the same gorouting. +type packQueue struct { + idx filePackTraverser + + packs map[restic.ID]*packInfo // waiting and ready packs + inprogress map[*packInfo]struct{} // inprogress packs + + heap *packHeap // heap of ready packs +} + +func newPackQueue(idx filePackTraverser, files []*fileInfo, inprogress func(files map[*fileInfo]struct{}) bool) (*packQueue, error) { + packs := make(map[restic.ID]*packInfo) // all packs + + // create packInfo from fileInfo + for _, file := range files { + err := idx.forEachFilePack(file, func(packIdx int, packID restic.ID, _ []restic.Blob) bool { + pack, ok := packs[packID] + if !ok { + pack = &packInfo{ + id: packID, + index: -1, + files: make(map[*fileInfo]struct{}), + } + packs[packID] = pack + } + pack.files[file] = struct{}{} + pack.cost += packIdx + + return true // keep going + }) + if err != nil { + // repository index is messed up, can't do anything + return nil, err + } + } + + // create packInfo heap + pheap := &packHeap{inprogress: inprogress} + headPacks := restic.NewIDSet() + for _, file := range files { + idx.forEachFilePack(file, func(packIdx int, packID restic.ID, _ []restic.Blob) bool { + if !headPacks.Has(packID) { + headPacks.Insert(packID) + pack := packs[packID] + pack.index = len(pheap.elements) + pheap.elements = append(pheap.elements, pack) + } + return false // only first pack + }) + } + heap.Init(pheap) + + return &packQueue{idx: idx, packs: packs, heap: pheap, inprogress: make(map[*packInfo]struct{})}, nil +} + +// isEmpty returns true if the queue is empty, i.e. there are no more packs to +// download and files to write to. +func (h *packQueue) isEmpty() bool { + return len(h.packs) == 0 && len(h.inprogress) == 0 +} + +// nextPack returns next ready pack and corresponding files ready for download +// and processing. The returned pack and the files are marked as "in progress" +// internally and must be first returned to the queue before they are +// considered by #nextPack again. +func (h *packQueue) nextPack() (*packInfo, []*fileInfo) { + debug.Log("Ready packs %d, outstanding packs %d, inprogress packs %d", h.heap.Len(), len(h.packs), len(h.inprogress)) + + if h.heap.Len() == 0 { + return nil, nil + } + + pack := heap.Pop(h.heap).(*packInfo) + h.inprogress[pack] = struct{}{} + debug.Log("Popped pack %s (%d files), heap size=%d", pack.id.Str(), len(pack.files), len(h.heap.elements)) + var files []*fileInfo + for file := range pack.files { + h.idx.forEachFilePack(file, func(packIdx int, packID restic.ID, packBlobs []restic.Blob) bool { + debug.Log("Pack #%d %s (%d blobs) used by %s", packIdx, packID.Str(), len(packBlobs), file.location) + if pack.id == packID { + files = append(files, file) + } + return false // only interested in the fist pack here + }) + } + + return pack, files +} + +// requeuePack conditionally adds back to the queue pack previously returned by +// #nextPack. +// If the pack is needed to restore any incomplete files, adds the pack to the +// queue and adjusts order of all affected packs in the queue. Has no effect +// if the pack is not required to restore any files. +// Returns true if the pack was added to the queue, false otherwise. +func (h *packQueue) requeuePack(pack *packInfo, success []*fileInfo, failure []*fileInfo) bool { + debug.Log("Requeue pack %s (%d/%d/%d files/success/failure)", pack.id.Str(), len(pack.files), len(success), len(failure)) + + // maintain inprogress pack set + delete(h.inprogress, pack) + + affectedPacks := make(map[*packInfo]struct{}) + affectedPacks[pack] = struct{}{} // this pack is alwats affected + + // apply download success/failure to the packs + onFailure := func(file *fileInfo) { + h.idx.forEachFilePack(file, func(packInx int, packID restic.ID, _ []restic.Blob) bool { + pack := h.packs[packID] + delete(pack.files, file) + pack.cost -= packInx + affectedPacks[pack] = struct{}{} + return true // keep going + }) + } + for _, file := range failure { + onFailure(file) + } + onSuccess := func(pack *packInfo, file *fileInfo) { + remove := true + h.idx.forEachFilePack(file, func(packIdx int, packID restic.ID, _ []restic.Blob) bool { + if packID.Equal(pack.id) { + // the pack has more blobs required by the file + remove = false + } + otherPack := h.packs[packID] + otherPack.cost-- + affectedPacks[otherPack] = struct{}{} + return true // keep going + }) + if remove { + delete(pack.files, file) + } + } + for _, file := range success { + onSuccess(pack, file) + } + + // drop/update affected packs + isReady := func(affectedPack *packInfo) (ready bool) { + for file := range affectedPack.files { + h.idx.forEachFilePack(file, func(packIdx int, packID restic.ID, _ []restic.Blob) bool { + if packID.Equal(affectedPack.id) { + ready = true + } + return false // only file's first pack matters + }) + if ready { + break + } + } + return ready + } + for affectedPack := range affectedPacks { + if _, inprogress := h.inprogress[affectedPack]; !inprogress { + if len(affectedPack.files) == 0 { + // drop the pack if it isn't inprogress and has no files that need it + if affectedPack.index >= 0 { + // This can't happen unless there is a bug elsewhere: + // - "current" pack isn't in the heap, hence its index must be < 0 + // - "other" packs can't be ready (i.e. in heap) unless they have other files + // in which case len(affectedPack.files) must be > 0 + debug.Log("corrupted ready heap: removed unexpected ready pack %s", affectedPack.id.Str()) + heap.Remove(h.heap, affectedPack.index) + } + delete(h.packs, affectedPack.id) + } else { + ready := isReady(affectedPack) + switch { + case ready && affectedPack.index < 0: + heap.Push(h.heap, affectedPack) + case ready && affectedPack.index >= 0: + heap.Fix(h.heap, affectedPack.index) + case !ready && affectedPack.index >= 0: + // This can't happen unless there is a bug elsewhere: + // - "current" pack isn't in the heap, hence its index must be < 0 + // - "other" packs can't have same head blobs as the "current" pack, + // hence "other" packs can't change their readiness + debug.Log("corrupted ready heap: removed unexpected waiting pack %s", affectedPack.id.Str()) + heap.Remove(h.heap, affectedPack.index) + case !ready && affectedPack.index < 0: + // do nothing + } + } + } + } + + return len(pack.files) > 0 +} diff --git a/internal/restorer/packqueue_test.go b/internal/restorer/packqueue_test.go new file mode 100644 index 000000000..880f7037a --- /dev/null +++ b/internal/restorer/packqueue_test.go @@ -0,0 +1,236 @@ +package restorer + +import ( + "testing" + + "github.com/restic/restic/internal/restic" + rtest "github.com/restic/restic/internal/test" +) + +func processPack(t *testing.T, data *TestRepo, pack *packInfo, files []*fileInfo) { + for _, file := range files { + data.idx.forEachFilePack(file, func(packIdx int, packID restic.ID, packBlobs []restic.Blob) bool { + // assert file's head pack + rtest.Equals(t, pack.id, packID) + file.blobs = file.blobs[len(packBlobs):] + return false // only interested in the head pack + }) + } +} + +func TestPackQueueBasic(t *testing.T) { + data := newTestRepo([]TestFile{ + TestFile{ + name: "file", + blobs: []TestBlob{ + TestBlob{"data1", "pack1"}, + TestBlob{"data2", "pack2"}, + }, + }, + }) + + queue, err := newPackQueue(data.idx, data.files, func(_ map[*fileInfo]struct{}) bool { return false }) + rtest.OK(t, err) + + // assert initial queue state + rtest.Equals(t, false, queue.isEmpty()) + rtest.Equals(t, 0, queue.packs[data.packID("pack1")].cost) + rtest.Equals(t, 1, queue.packs[data.packID("pack2")].cost) + + // get first pack + pack, files := queue.nextPack() + rtest.Equals(t, "pack1", data.packName(pack)) + rtest.Equals(t, 1, len(files)) + rtest.Equals(t, false, queue.isEmpty()) + // TODO assert pack is inprogress + + // can't process the second pack until the first one is processed + { + pack, files := queue.nextPack() + rtest.Equals(t, true, pack == nil) + rtest.Equals(t, true, files == nil) + rtest.Equals(t, false, queue.isEmpty()) + } + + // requeue the pack without processing + rtest.Equals(t, true, queue.requeuePack(pack, []*fileInfo{}, []*fileInfo{})) + rtest.Equals(t, false, queue.isEmpty()) + rtest.Equals(t, 0, queue.packs[data.packID("pack1")].cost) + rtest.Equals(t, 1, queue.packs[data.packID("pack2")].cost) + + // get the first pack again + pack, files = queue.nextPack() + rtest.Equals(t, "pack1", data.packName(pack)) + rtest.Equals(t, 1, len(files)) + rtest.Equals(t, false, queue.isEmpty()) + + // process the first pack and return it to the queue + processPack(t, data, pack, files) + rtest.Equals(t, false, queue.requeuePack(pack, files, []*fileInfo{})) + rtest.Equals(t, 0, queue.packs[data.packID("pack2")].cost) + + // get the second pack + pack, files = queue.nextPack() + rtest.Equals(t, "pack2", data.packName(pack)) + rtest.Equals(t, 1, len(files)) + rtest.Equals(t, false, queue.isEmpty()) + + // process the second pack and return it to the queue + processPack(t, data, pack, files) + rtest.Equals(t, false, queue.requeuePack(pack, files, []*fileInfo{})) + + // all packs processed + rtest.Equals(t, true, queue.isEmpty()) +} + +func TestPackQueueFailedFile(t *testing.T) { + // point of this test is to assert that enqueuePack removes + // all references to files that failed restore + + data := newTestRepo([]TestFile{ + TestFile{ + name: "file", + blobs: []TestBlob{ + TestBlob{"data1", "pack1"}, + TestBlob{"data2", "pack2"}, + }, + }, + }) + + queue, err := newPackQueue(data.idx, data.files, func(_ map[*fileInfo]struct{}) bool { return false }) + rtest.OK(t, err) + + pack, files := queue.nextPack() + rtest.Equals(t, false, queue.requeuePack(pack, []*fileInfo{}, files /*failed*/)) + rtest.Equals(t, true, queue.isEmpty()) +} + +func TestPackQueueOrderingCost(t *testing.T) { + // assert pack1 is selected before pack2: + // pack1 is ready to restore file1, pack2 is ready to restore file2 + // but pack2 cannot be immediately used to restore file1 + + data := newTestRepo([]TestFile{ + TestFile{ + name: "file1", + blobs: []TestBlob{ + TestBlob{"data1", "pack1"}, + TestBlob{"data2", "pack2"}, + }, + }, + TestFile{ + name: "file2", + blobs: []TestBlob{ + TestBlob{"data2", "pack2"}, + }, + }, + }) + + queue, err := newPackQueue(data.idx, data.files, func(_ map[*fileInfo]struct{}) bool { return false }) + rtest.OK(t, err) + + // assert initial pack costs + rtest.Equals(t, 0, data.pack(queue, "pack1").cost) + rtest.Equals(t, 0, data.pack(queue, "pack1").index) // head of the heap + rtest.Equals(t, 1, data.pack(queue, "pack2").cost) + rtest.Equals(t, 1, data.pack(queue, "pack2").index) + + pack, files := queue.nextPack() + // assert selected pack and queue state + rtest.Equals(t, "pack1", data.packName(pack)) + // process the pack + processPack(t, data, pack, files) + rtest.Equals(t, false, queue.requeuePack(pack, files, []*fileInfo{})) +} + +func TestPackQueueOrderingInprogress(t *testing.T) { + // finish restoring one file before starting another + + data := newTestRepo([]TestFile{ + TestFile{ + name: "file1", + blobs: []TestBlob{ + TestBlob{"data1-1", "pack1-1"}, + TestBlob{"data1-2", "pack1-2"}, + }, + }, + TestFile{ + name: "file2", + blobs: []TestBlob{ + TestBlob{"data2-1", "pack2-1"}, + TestBlob{"data2-2", "pack2-2"}, + }, + }, + }) + + var inprogress *fileInfo + queue, err := newPackQueue(data.idx, data.files, func(files map[*fileInfo]struct{}) bool { + _, found := files[inprogress] + return found + }) + rtest.OK(t, err) + + // first pack of a file + pack, files := queue.nextPack() + rtest.Equals(t, 1, len(files)) + file := files[0] + processPack(t, data, pack, files) + inprogress = files[0] + queue.requeuePack(pack, files, []*fileInfo{}) + + // second pack of the same file + pack, files = queue.nextPack() + rtest.Equals(t, 1, len(files)) + rtest.Equals(t, true, file == files[0]) // same file as before + processPack(t, data, pack, files) + inprogress = nil + queue.requeuePack(pack, files, []*fileInfo{}) + + // first pack of the second file + pack, files = queue.nextPack() + rtest.Equals(t, 1, len(files)) + rtest.Equals(t, false, file == files[0]) // different file as before +} + +func TestPackQueuePackMultiuse(t *testing.T) { + // the same pack is required multiple times to restore the same file + + data := newTestRepo([]TestFile{ + TestFile{ + name: "file", + blobs: []TestBlob{ + TestBlob{"data1", "pack1"}, + TestBlob{"data2", "pack2"}, + TestBlob{"data3", "pack1"}, // pack1 reuse, new blob + TestBlob{"data2", "pack2"}, // pack2 reuse, same blob + }, + }, + }) + + queue, err := newPackQueue(data.idx, data.files, func(_ map[*fileInfo]struct{}) bool { return false }) + rtest.OK(t, err) + + pack, files := queue.nextPack() + rtest.Equals(t, "pack1", data.packName(pack)) + rtest.Equals(t, 1, len(pack.files)) + processPack(t, data, pack, files) + rtest.Equals(t, true, queue.requeuePack(pack, files, []*fileInfo{})) + + pack, files = queue.nextPack() + rtest.Equals(t, "pack2", data.packName(pack)) + rtest.Equals(t, 1, len(pack.files)) + processPack(t, data, pack, files) + rtest.Equals(t, true, queue.requeuePack(pack, files, []*fileInfo{})) + + pack, files = queue.nextPack() + rtest.Equals(t, "pack1", data.packName(pack)) + processPack(t, data, pack, files) + rtest.Equals(t, false, queue.requeuePack(pack, files, []*fileInfo{})) + + pack, files = queue.nextPack() + rtest.Equals(t, "pack2", data.packName(pack)) + processPack(t, data, pack, files) + rtest.Equals(t, false, queue.requeuePack(pack, files, []*fileInfo{})) + + rtest.Equals(t, true, queue.isEmpty()) +} diff --git a/internal/restorer/restorer.go b/internal/restorer/restorer.go index 73e844ac0..b6fb2b60e 100644 --- a/internal/restorer/restorer.go +++ b/internal/restorer/restorer.go @@ -18,16 +18,17 @@ type Restorer struct { repo restic.Repository sn *restic.Snapshot - Error func(dir string, node *restic.Node, err error) error + Error func(location string, err error) error SelectFilter func(item string, dstpath string, node *restic.Node) (selectedForRestore bool, childMayBeSelected bool) } -var restorerAbortOnAllErrors = func(str string, node *restic.Node, err error) error { return err } +var restorerAbortOnAllErrors = func(location string, err error) error { return err } // NewRestorer creates a restorer preloaded with the content from the snapshot id. func NewRestorer(repo restic.Repository, id restic.ID) (*Restorer, error) { r := &Restorer{ - repo: repo, Error: restorerAbortOnAllErrors, + repo: repo, + Error: restorerAbortOnAllErrors, SelectFilter: func(string, string, *restic.Node) (bool, bool) { return true, true }, } @@ -54,7 +55,7 @@ func (res *Restorer) traverseTree(ctx context.Context, target, location string, tree, err := res.repo.LoadTree(ctx, treeID) if err != nil { debug.Log("error loading tree %v: %v", treeID, err) - return res.Error(location, nil, err) + return res.Error(location, err) } for _, node := range tree.Nodes { @@ -64,7 +65,7 @@ func (res *Restorer) traverseTree(ctx context.Context, target, location string, nodeName := filepath.Base(filepath.Join(string(filepath.Separator), node.Name)) if nodeName != node.Name { debug.Log("node %q has invalid name %q", node.Name, nodeName) - err := res.Error(location, node, errors.New("node has invalid name")) + err := res.Error(location, errors.Errorf("invalid child node name %s", node.Name)) if err != nil { return err } @@ -77,7 +78,7 @@ func (res *Restorer) traverseTree(ctx context.Context, target, location string, if target == nodeTarget || !fs.HasPathPrefix(target, nodeTarget) { debug.Log("target: %v %v", target, nodeTarget) debug.Log("node %q has invalid target path %q", node.Name, nodeTarget) - err := res.Error(nodeLocation, node, errors.New("node has invalid path")) + err := res.Error(nodeLocation, errors.New("node has invalid path")) if err != nil { return err } @@ -94,7 +95,7 @@ func (res *Restorer) traverseTree(ctx context.Context, target, location string, sanitizeError := func(err error) error { if err != nil { - err = res.Error(nodeTarget, node, err) + err = res.Error(nodeLocation, err) } return err } @@ -139,10 +140,10 @@ func (res *Restorer) traverseTree(ctx context.Context, target, location string, return nil } -func (res *Restorer) restoreNodeTo(ctx context.Context, node *restic.Node, target, location string, idx *restic.HardlinkIndex) error { +func (res *Restorer) restoreNodeTo(ctx context.Context, node *restic.Node, target, location string) error { debug.Log("restoreNode %v %v %v", node.Name, target, location) - err := node.CreateAt(ctx, target, res.repo, idx) + err := node.CreateAt(ctx, target, res.repo) if err != nil { debug.Log("node.CreateAt(%s) error %v", target, err) } @@ -162,6 +163,31 @@ func (res *Restorer) restoreNodeMetadataTo(node *restic.Node, target, location s return err } +func (res *Restorer) restoreHardlinkAt(node *restic.Node, target, path, location string) error { + if err := fs.Remove(path); !os.IsNotExist(err) { + return errors.Wrap(err, "RemoveCreateHardlink") + } + err := fs.Link(target, path) + if err != nil { + return errors.Wrap(err, "CreateHardlink") + } + // TODO investigate if hardlinks have separate metadata on any supported system + return res.restoreNodeMetadataTo(node, path, location) +} + +func (res *Restorer) restoreEmptyFileAt(node *restic.Node, target, location string) error { + wr, err := os.OpenFile(target, os.O_CREATE|os.O_TRUNC|os.O_WRONLY, 0600) + if err != nil { + return err + } + err = wr.Close() + if err != nil { + return err + } + + return res.restoreNodeMetadataTo(node, target, location) +} + // RestoreTo creates the directories and files in the snapshot below dst. // Before an item is created, res.Filter is called. func (res *Restorer) RestoreTo(ctx context.Context, dst string) error { @@ -173,35 +199,84 @@ func (res *Restorer) RestoreTo(ctx context.Context, dst string) error { } } - // make sure the target directory exists - err = fs.MkdirAll(dst, 0777) // umask takes care of dir permissions - if err != nil { - return errors.Wrap(err, "MkdirAll") + restoreNodeMetadata := func(node *restic.Node, target, location string) error { + return res.restoreNodeMetadataTo(node, target, location) } + noop := func(node *restic.Node, target, location string) error { return nil } idx := restic.NewHardlinkIndex() - return res.traverseTree(ctx, dst, string(filepath.Separator), *res.sn.Tree, treeVisitor{ + + filerestorer := newFileRestorer(dst, res.repo.Backend().Load, res.repo.Key(), filePackTraverser{lookup: res.repo.Index().Lookup}) + + // first tree pass: create directories and collect all files to restore + err = res.traverseTree(ctx, dst, string(filepath.Separator), *res.sn.Tree, treeVisitor{ enterDir: func(node *restic.Node, target, location string) error { // create dir with default permissions // #leaveDir restores dir metadata after visiting all children return fs.MkdirAll(target, 0700) }, + visitNode: func(node *restic.Node, target, location string) error { // create parent dir with default permissions - // #leaveDir restores dir metadata after visiting all children + // second pass #leaveDir restores dir metadata after visiting/restoring all children err := fs.MkdirAll(filepath.Dir(target), 0700) if err != nil { return err } - return res.restoreNodeTo(ctx, node, target, location, idx) + if node.Type != "file" { + return nil + } + + if node.Size == 0 { + return nil // deal with empty files later + } + + if node.Links > 1 { + if idx.Has(node.Inode, node.DeviceID) { + return nil + } + idx.Add(node.Inode, node.DeviceID, location) + } + + filerestorer.addFile(location, node.Content) + + return nil }, - leaveDir: func(node *restic.Node, target, location string) error { - // Restore directory permissions and timestamp at the end. If we did it earlier - // - children restore could fail because of restictive directory permission - // - children restore could overwrite the timestamp of the directory they are in + leaveDir: noop, + }) + if err != nil { + return err + } + + err = filerestorer.restoreFiles(ctx, func(location string, err error) { res.Error(location, err) }) + if err != nil { + return err + } + + // second tree pass: restore special files and filesystem metadata + return res.traverseTree(ctx, dst, string(filepath.Separator), *res.sn.Tree, treeVisitor{ + enterDir: noop, + visitNode: func(node *restic.Node, target, location string) error { + if node.Type != "file" { + return res.restoreNodeTo(ctx, node, target, location) + } + + // create empty files, but not hardlinks to empty files + if node.Size == 0 && (node.Links < 2 || !idx.Has(node.Inode, node.DeviceID)) { + if node.Links > 1 { + idx.Add(node.Inode, node.DeviceID, location) + } + return res.restoreEmptyFileAt(node, target, location) + } + + if idx.Has(node.Inode, node.DeviceID) && idx.GetFilename(node.Inode, node.DeviceID) != location { + return res.restoreHardlinkAt(node, filerestorer.targetPath(idx.GetFilename(node.Inode, node.DeviceID)), target, location) + } + return res.restoreNodeMetadataTo(node, target, location) }, + leaveDir: restoreNodeMetadata, }) } diff --git a/internal/restorer/restorer_test.go b/internal/restorer/restorer_test.go index c5fdd6cb8..70136bfe3 100644 --- a/internal/restorer/restorer_test.go +++ b/internal/restorer/restorer_test.go @@ -24,7 +24,9 @@ type Snapshot struct { } type File struct { - Data string + Data string + Links uint64 + Inode uint64 } type Dir struct { @@ -44,26 +46,40 @@ func saveFile(t testing.TB, repo restic.Repository, node File) restic.ID { return id } -func saveDir(t testing.TB, repo restic.Repository, nodes map[string]Node) restic.ID { +func saveDir(t testing.TB, repo restic.Repository, nodes map[string]Node, inode uint64) restic.ID { ctx, cancel := context.WithCancel(context.Background()) defer cancel() tree := &restic.Tree{} for name, n := range nodes { - var id restic.ID + inode++ switch node := n.(type) { case File: - id = saveFile(t, repo, node) + fi := n.(File).Inode + if fi == 0 { + fi = inode + } + lc := n.(File).Links + if lc == 0 { + lc = 1 + } + fc := []restic.ID{} + if len(n.(File).Data) > 0 { + fc = append(fc, saveFile(t, repo, node)) + } tree.Insert(&restic.Node{ Type: "file", Mode: 0644, Name: name, UID: uint32(os.Getuid()), GID: uint32(os.Getgid()), - Content: []restic.ID{id}, + Content: fc, + Size: uint64(len(n.(File).Data)), + Inode: fi, + Links: lc, }) case Dir: - id = saveDir(t, repo, node.Nodes) + id := saveDir(t, repo, node.Nodes, inode) mode := node.Mode if mode == 0 { @@ -95,7 +111,7 @@ func saveSnapshot(t testing.TB, repo restic.Repository, snapshot Snapshot) (*res ctx, cancel := context.WithCancel(context.Background()) defer cancel() - treeID := saveDir(t, repo, snapshot.Nodes) + treeID := saveDir(t, repo, snapshot.Nodes, 1000) err := repo.Flush(ctx) if err != nil { @@ -131,18 +147,18 @@ func TestRestorer(t *testing.T) { var tests = []struct { Snapshot Files map[string]string - ErrorsMust map[string]string - ErrorsMay map[string]string + ErrorsMust map[string]map[string]struct{} + ErrorsMay map[string]map[string]struct{} Select func(item string, dstpath string, node *restic.Node) (selectForRestore bool, childMayBeSelected bool) }{ // valid test cases { Snapshot: Snapshot{ Nodes: map[string]Node{ - "foo": File{"content: foo\n"}, + "foo": File{Data: "content: foo\n"}, "dirtest": Dir{ Nodes: map[string]Node{ - "file": File{"content: file\n"}, + "file": File{Data: "content: file\n"}, }, }, }, @@ -155,13 +171,13 @@ func TestRestorer(t *testing.T) { { Snapshot: Snapshot{ Nodes: map[string]Node{ - "top": File{"toplevel file"}, + "top": File{Data: "toplevel file"}, "dir": Dir{ Nodes: map[string]Node{ - "file": File{"file in dir"}, + "file": File{Data: "file in dir"}, "subdir": Dir{ Nodes: map[string]Node{ - "file": File{"file in subdir"}, + "file": File{Data: "file in subdir"}, }, }, }, @@ -180,7 +196,7 @@ func TestRestorer(t *testing.T) { "dir": Dir{ Mode: 0444, }, - "file": File{"top-level file"}, + "file": File{Data: "top-level file"}, }, }, Files: map[string]string{ @@ -193,7 +209,7 @@ func TestRestorer(t *testing.T) { "dir": Dir{ Mode: 0555, Nodes: map[string]Node{ - "file": File{"file in dir"}, + "file": File{Data: "file in dir"}, }, }, }, @@ -205,7 +221,7 @@ func TestRestorer(t *testing.T) { { Snapshot: Snapshot{ Nodes: map[string]Node{ - "topfile": File{"top-level file"}, + "topfile": File{Data: "top-level file"}, }, }, Files: map[string]string{ @@ -217,7 +233,7 @@ func TestRestorer(t *testing.T) { Nodes: map[string]Node{ "dir": Dir{ Nodes: map[string]Node{ - "file": File{"content: file\n"}, + "file": File{Data: "content: file\n"}, }, }, }, @@ -242,40 +258,44 @@ func TestRestorer(t *testing.T) { { Snapshot: Snapshot{ Nodes: map[string]Node{ - `..\test`: File{"foo\n"}, - `..\..\foo\..\bar\..\xx\test2`: File{"test2\n"}, + `..\test`: File{Data: "foo\n"}, + `..\..\foo\..\bar\..\xx\test2`: File{Data: "test2\n"}, }, }, - ErrorsMay: map[string]string{ - `/#..\test`: "node has invalid name", - `/#..\..\foo\..\bar\..\xx\test2`: "node has invalid name", + ErrorsMay: map[string]map[string]struct{}{ + `/`: { + `invalid child node name ..\test`: struct{}{}, + `invalid child node name ..\..\foo\..\bar\..\xx\test2`: struct{}{}, + }, }, }, { Snapshot: Snapshot{ Nodes: map[string]Node{ - `../test`: File{"foo\n"}, - `../../foo/../bar/../xx/test2`: File{"test2\n"}, + `../test`: File{Data: "foo\n"}, + `../../foo/../bar/../xx/test2`: File{Data: "test2\n"}, }, }, - ErrorsMay: map[string]string{ - `/#../test`: "node has invalid name", - `/#../../foo/../bar/../xx/test2`: "node has invalid name", + ErrorsMay: map[string]map[string]struct{}{ + `/`: { + `invalid child node name ../test`: struct{}{}, + `invalid child node name ../../foo/../bar/../xx/test2`: struct{}{}, + }, }, }, { Snapshot: Snapshot{ Nodes: map[string]Node{ - "top": File{"toplevel file"}, + "top": File{Data: "toplevel file"}, "x": Dir{ Nodes: map[string]Node{ - "file1": File{"file1"}, + "file1": File{Data: "file1"}, "..": Dir{ Nodes: map[string]Node{ - "file2": File{"file2"}, + "file2": File{Data: "file2"}, "..": Dir{ Nodes: map[string]Node{ - "file2": File{"file2"}, + "file2": File{Data: "file2"}, }, }, }, @@ -287,8 +307,10 @@ func TestRestorer(t *testing.T) { Files: map[string]string{ "top": "toplevel file", }, - ErrorsMust: map[string]string{ - `/x#..`: "node has invalid name", + ErrorsMust: map[string]map[string]struct{}{ + `/x`: { + `invalid child node name ..`: struct{}{}, + }, }, }, } @@ -326,11 +348,14 @@ func TestRestorer(t *testing.T) { return true, true } - errors := make(map[string]string) - res.Error = func(dir string, node *restic.Node, err error) error { - t.Logf("restore returned error for %q in dir %v: %v", node.Name, dir, err) - dir = toSlash(dir) - errors[dir+"#"+node.Name] = err.Error() + errors := make(map[string]map[string]struct{}) + res.Error = func(location string, err error) error { + location = toSlash(location) + t.Logf("restore returned error for %q: %v", location, err) + if errors[location] == nil { + errors[location] = make(map[string]struct{}) + } + errors[location][err.Error()] = struct{}{} return nil } @@ -342,33 +367,27 @@ func TestRestorer(t *testing.T) { t.Fatal(err) } - for filename, errorMessage := range test.ErrorsMust { - msg, ok := errors[filename] + for location, expectedErrors := range test.ErrorsMust { + actualErrors, ok := errors[location] if !ok { - t.Errorf("expected error for %v, found none", filename) + t.Errorf("expected error(s) for %v, found none", location) continue } - if msg != "" && msg != errorMessage { - t.Errorf("wrong error message for %v: got %q, want %q", - filename, msg, errorMessage) - } + rtest.Equals(t, expectedErrors, actualErrors) - delete(errors, filename) + delete(errors, location) } - for filename, errorMessage := range test.ErrorsMay { - msg, ok := errors[filename] + for location, expectedErrors := range test.ErrorsMay { + actualErrors, ok := errors[location] if !ok { continue } - if msg != "" && msg != errorMessage { - t.Errorf("wrong error message for %v: got %q, want %q", - filename, msg, errorMessage) - } + rtest.Equals(t, expectedErrors, actualErrors) - delete(errors, filename) + delete(errors, location) } for filename, err := range errors { @@ -398,10 +417,10 @@ func TestRestorerRelative(t *testing.T) { { Snapshot: Snapshot{ Nodes: map[string]Node{ - "foo": File{"content: foo\n"}, + "foo": File{Data: "content: foo\n"}, "dirtest": Dir{ Nodes: map[string]Node{ - "file": File{"content: file\n"}, + "file": File{Data: "content: file\n"}, }, }, }, @@ -433,10 +452,9 @@ func TestRestorerRelative(t *testing.T) { defer cleanup() errors := make(map[string]string) - res.Error = func(dir string, node *restic.Node, err error) error { - t.Logf("restore returned error for %q in dir %v: %v", node.Name, dir, err) - dir = toSlash(dir) - errors[dir+"#"+node.Name] = err.Error() + res.Error = func(location string, err error) error { + t.Logf("restore returned error for %q: %v", location, err) + errors[location] = err.Error() return nil } @@ -521,12 +539,12 @@ func TestRestorerTraverseTree(t *testing.T) { Snapshot: Snapshot{ Nodes: map[string]Node{ "dir": Dir{Nodes: map[string]Node{ - "otherfile": File{"x"}, + "otherfile": File{Data: "x"}, "subdir": Dir{Nodes: map[string]Node{ - "file": File{"content: file\n"}, + "file": File{Data: "content: file\n"}, }}, }}, - "foo": File{"content: foo\n"}, + "foo": File{Data: "content: foo\n"}, }, }, Select: func(item string, dstpath string, node *restic.Node) (selectForRestore bool, childMayBeSelected bool) { @@ -548,12 +566,12 @@ func TestRestorerTraverseTree(t *testing.T) { Snapshot: Snapshot{ Nodes: map[string]Node{ "dir": Dir{Nodes: map[string]Node{ - "otherfile": File{"x"}, + "otherfile": File{Data: "x"}, "subdir": Dir{Nodes: map[string]Node{ - "file": File{"content: file\n"}, + "file": File{Data: "content: file\n"}, }}, }}, - "foo": File{"content: foo\n"}, + "foo": File{Data: "content: foo\n"}, }, }, Select: func(item string, dstpath string, node *restic.Node) (selectForRestore bool, childMayBeSelected bool) { @@ -569,11 +587,11 @@ func TestRestorerTraverseTree(t *testing.T) { { Snapshot: Snapshot{ Nodes: map[string]Node{ - "aaa": File{"content: foo\n"}, + "aaa": File{Data: "content: foo\n"}, "dir": Dir{Nodes: map[string]Node{ - "otherfile": File{"x"}, + "otherfile": File{Data: "x"}, "subdir": Dir{Nodes: map[string]Node{ - "file": File{"content: file\n"}, + "file": File{Data: "content: file\n"}, }}, }}, }, @@ -594,12 +612,12 @@ func TestRestorerTraverseTree(t *testing.T) { Snapshot: Snapshot{ Nodes: map[string]Node{ "dir": Dir{Nodes: map[string]Node{ - "otherfile": File{"x"}, + "otherfile": File{Data: "x"}, "subdir": Dir{Nodes: map[string]Node{ - "file": File{"content: file\n"}, + "file": File{Data: "content: file\n"}, }}, }}, - "foo": File{"content: foo\n"}, + "foo": File{Data: "content: foo\n"}, }, }, Select: func(item string, dstpath string, node *restic.Node) (selectForRestore bool, childMayBeSelected bool) { @@ -623,12 +641,12 @@ func TestRestorerTraverseTree(t *testing.T) { Snapshot: Snapshot{ Nodes: map[string]Node{ "dir": Dir{Nodes: map[string]Node{ - "otherfile": File{"x"}, + "otherfile": File{Data: "x"}, "subdir": Dir{Nodes: map[string]Node{ - "file": File{"content: file\n"}, + "file": File{Data: "content: file\n"}, }}, }}, - "foo": File{"content: foo\n"}, + "foo": File{Data: "content: foo\n"}, }, }, Select: func(item string, dstpath string, node *restic.Node) (selectForRestore bool, childMayBeSelected bool) { diff --git a/internal/restorer/restorer_unix_test.go b/internal/restorer/restorer_unix_test.go new file mode 100644 index 000000000..fc80015c1 --- /dev/null +++ b/internal/restorer/restorer_unix_test.go @@ -0,0 +1,61 @@ +//+build !windows + +package restorer + +import ( + "context" + "os" + "path/filepath" + "syscall" + "testing" + + "github.com/restic/restic/internal/repository" + "github.com/restic/restic/internal/restic" + rtest "github.com/restic/restic/internal/test" +) + +func TestRestorerRestoreEmptyHardlinkedFileds(t *testing.T) { + repo, cleanup := repository.TestRepository(t) + defer cleanup() + + _, id := saveSnapshot(t, repo, Snapshot{ + Nodes: map[string]Node{ + "dirtest": Dir{ + Nodes: map[string]Node{ + "file1": File{Links: 2, Inode: 1}, + "file2": File{Links: 2, Inode: 1}, + }, + }, + }, + }) + + res, err := NewRestorer(repo, id) + rtest.OK(t, err) + + res.SelectFilter = func(item string, dstpath string, node *restic.Node) (selectedForRestore bool, childMayBeSelected bool) { + return true, true + } + + tempdir, cleanup := rtest.TempDir(t) + defer cleanup() + + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + + err = res.RestoreTo(ctx, tempdir) + rtest.OK(t, err) + + f1, err := os.Stat(filepath.Join(tempdir, "dirtest/file1")) + rtest.OK(t, err) + rtest.Equals(t, int64(0), f1.Size()) + s1, ok1 := f1.Sys().(*syscall.Stat_t) + + f2, err := os.Stat(filepath.Join(tempdir, "dirtest/file2")) + rtest.OK(t, err) + rtest.Equals(t, int64(0), f2.Size()) + s2, ok2 := f2.Sys().(*syscall.Stat_t) + + if ok1 && ok2 { + rtest.Equals(t, s1.Ino, s2.Ino) + } +} diff --git a/vendor/github.com/hashicorp/golang-lru/LICENSE b/vendor/github.com/hashicorp/golang-lru/LICENSE new file mode 100644 index 000000000..be2cc4dfb --- /dev/null +++ b/vendor/github.com/hashicorp/golang-lru/LICENSE @@ -0,0 +1,362 @@ +Mozilla Public License, version 2.0 + +1. Definitions + +1.1. "Contributor" + + means each individual or legal entity that creates, contributes to the + creation of, or owns Covered Software. + +1.2. "Contributor Version" + + means the combination of the Contributions of others (if any) used by a + Contributor and that particular Contributor's Contribution. + +1.3. "Contribution" + + means Covered Software of a particular Contributor. + +1.4. "Covered Software" + + means Source Code Form to which the initial Contributor has attached the + notice in Exhibit A, the Executable Form of such Source Code Form, and + Modifications of such Source Code Form, in each case including portions + thereof. + +1.5. "Incompatible With Secondary Licenses" + means + + a. that the initial Contributor has attached the notice described in + Exhibit B to the Covered Software; or + + b. that the Covered Software was made available under the terms of + version 1.1 or earlier of the License, but not also under the terms of + a Secondary License. + +1.6. "Executable Form" + + means any form of the work other than Source Code Form. + +1.7. "Larger Work" + + means a work that combines Covered Software with other material, in a + separate file or files, that is not Covered Software. + +1.8. "License" + + means this document. + +1.9. "Licensable" + + means having the right to grant, to the maximum extent possible, whether + at the time of the initial grant or subsequently, any and all of the + rights conveyed by this License. + +1.10. "Modifications" + + means any of the following: + + a. any file in Source Code Form that results from an addition to, + deletion from, or modification of the contents of Covered Software; or + + b. any new file in Source Code Form that contains any Covered Software. + +1.11. "Patent Claims" of a Contributor + + means any patent claim(s), including without limitation, method, + process, and apparatus claims, in any patent Licensable by such + Contributor that would be infringed, but for the grant of the License, + by the making, using, selling, offering for sale, having made, import, + or transfer of either its Contributions or its Contributor Version. + +1.12. "Secondary License" + + means either the GNU General Public License, Version 2.0, the GNU Lesser + General Public License, Version 2.1, the GNU Affero General Public + License, Version 3.0, or any later versions of those licenses. + +1.13. "Source Code Form" + + means the form of the work preferred for making modifications. + +1.14. "You" (or "Your") + + means an individual or a legal entity exercising rights under this + License. For legal entities, "You" includes any entity that controls, is + controlled by, or is under common control with You. For purposes of this + definition, "control" means (a) the power, direct or indirect, to cause + the direction or management of such entity, whether by contract or + otherwise, or (b) ownership of more than fifty percent (50%) of the + outstanding shares or beneficial ownership of such entity. + + +2. License Grants and Conditions + +2.1. Grants + + Each Contributor hereby grants You a world-wide, royalty-free, + non-exclusive license: + + a. under intellectual property rights (other than patent or trademark) + Licensable by such Contributor to use, reproduce, make available, + modify, display, perform, distribute, and otherwise exploit its + Contributions, either on an unmodified basis, with Modifications, or + as part of a Larger Work; and + + b. under Patent Claims of such Contributor to make, use, sell, offer for + sale, have made, import, and otherwise transfer either its + Contributions or its Contributor Version. + +2.2. Effective Date + + The licenses granted in Section 2.1 with respect to any Contribution + become effective for each Contribution on the date the Contributor first + distributes such Contribution. + +2.3. Limitations on Grant Scope + + The licenses granted in this Section 2 are the only rights granted under + this License. No additional rights or licenses will be implied from the + distribution or licensing of Covered Software under this License. + Notwithstanding Section 2.1(b) above, no patent license is granted by a + Contributor: + + a. for any code that a Contributor has removed from Covered Software; or + + b. for infringements caused by: (i) Your and any other third party's + modifications of Covered Software, or (ii) the combination of its + Contributions with other software (except as part of its Contributor + Version); or + + c. under Patent Claims infringed by Covered Software in the absence of + its Contributions. + + This License does not grant any rights in the trademarks, service marks, + or logos of any Contributor (except as may be necessary to comply with + the notice requirements in Section 3.4). + +2.4. Subsequent Licenses + + No Contributor makes additional grants as a result of Your choice to + distribute the Covered Software under a subsequent version of this + License (see Section 10.2) or under the terms of a Secondary License (if + permitted under the terms of Section 3.3). + +2.5. Representation + + Each Contributor represents that the Contributor believes its + Contributions are its original creation(s) or it has sufficient rights to + grant the rights to its Contributions conveyed by this License. + +2.6. Fair Use + + This License is not intended to limit any rights You have under + applicable copyright doctrines of fair use, fair dealing, or other + equivalents. + +2.7. Conditions + + Sections 3.1, 3.2, 3.3, and 3.4 are conditions of the licenses granted in + Section 2.1. + + +3. Responsibilities + +3.1. Distribution of Source Form + + All distribution of Covered Software in Source Code Form, including any + Modifications that You create or to which You contribute, must be under + the terms of this License. You must inform recipients that the Source + Code Form of the Covered Software is governed by the terms of this + License, and how they can obtain a copy of this License. You may not + attempt to alter or restrict the recipients' rights in the Source Code + Form. + +3.2. Distribution of Executable Form + + If You distribute Covered Software in Executable Form then: + + a. such Covered Software must also be made available in Source Code Form, + as described in Section 3.1, and You must inform recipients of the + Executable Form how they can obtain a copy of such Source Code Form by + reasonable means in a timely manner, at a charge no more than the cost + of distribution to the recipient; and + + b. You may distribute such Executable Form under the terms of this + License, or sublicense it under different terms, provided that the + license for the Executable Form does not attempt to limit or alter the + recipients' rights in the Source Code Form under this License. + +3.3. Distribution of a Larger Work + + You may create and distribute a Larger Work under terms of Your choice, + provided that You also comply with the requirements of this License for + the Covered Software. If the Larger Work is a combination of Covered + Software with a work governed by one or more Secondary Licenses, and the + Covered Software is not Incompatible With Secondary Licenses, this + License permits You to additionally distribute such Covered Software + under the terms of such Secondary License(s), so that the recipient of + the Larger Work may, at their option, further distribute the Covered + Software under the terms of either this License or such Secondary + License(s). + +3.4. Notices + + You may not remove or alter the substance of any license notices + (including copyright notices, patent notices, disclaimers of warranty, or + limitations of liability) contained within the Source Code Form of the + Covered Software, except that You may alter any license notices to the + extent required to remedy known factual inaccuracies. + +3.5. Application of Additional Terms + + You may choose to offer, and to charge a fee for, warranty, support, + indemnity or liability obligations to one or more recipients of Covered + Software. However, You may do so only on Your own behalf, and not on + behalf of any Contributor. You must make it absolutely clear that any + such warranty, support, indemnity, or liability obligation is offered by + You alone, and You hereby agree to indemnify every Contributor for any + liability incurred by such Contributor as a result of warranty, support, + indemnity or liability terms You offer. You may include additional + disclaimers of warranty and limitations of liability specific to any + jurisdiction. + +4. Inability to Comply Due to Statute or Regulation + + If it is impossible for You to comply with any of the terms of this License + with respect to some or all of the Covered Software due to statute, + judicial order, or regulation then You must: (a) comply with the terms of + this License to the maximum extent possible; and (b) describe the + limitations and the code they affect. Such description must be placed in a + text file included with all distributions of the Covered Software under + this License. Except to the extent prohibited by statute or regulation, + such description must be sufficiently detailed for a recipient of ordinary + skill to be able to understand it. + +5. Termination + +5.1. The rights granted under this License will terminate automatically if You + fail to comply with any of its terms. However, if You become compliant, + then the rights granted under this License from a particular Contributor + are reinstated (a) provisionally, unless and until such Contributor + explicitly and finally terminates Your grants, and (b) on an ongoing + basis, if such Contributor fails to notify You of the non-compliance by + some reasonable means prior to 60 days after You have come back into + compliance. Moreover, Your grants from a particular Contributor are + reinstated on an ongoing basis if such Contributor notifies You of the + non-compliance by some reasonable means, this is the first time You have + received notice of non-compliance with this License from such + Contributor, and You become compliant prior to 30 days after Your receipt + of the notice. + +5.2. If You initiate litigation against any entity by asserting a patent + infringement claim (excluding declaratory judgment actions, + counter-claims, and cross-claims) alleging that a Contributor Version + directly or indirectly infringes any patent, then the rights granted to + You by any and all Contributors for the Covered Software under Section + 2.1 of this License shall terminate. + +5.3. In the event of termination under Sections 5.1 or 5.2 above, all end user + license agreements (excluding distributors and resellers) which have been + validly granted by You or Your distributors under this License prior to + termination shall survive termination. + +6. Disclaimer of Warranty + + Covered Software is provided under this License on an "as is" basis, + without warranty of any kind, either expressed, implied, or statutory, + including, without limitation, warranties that the Covered Software is free + of defects, merchantable, fit for a particular purpose or non-infringing. + The entire risk as to the quality and performance of the Covered Software + is with You. Should any Covered Software prove defective in any respect, + You (not any Contributor) assume the cost of any necessary servicing, + repair, or correction. This disclaimer of warranty constitutes an essential + part of this License. No use of any Covered Software is authorized under + this License except under this disclaimer. + +7. Limitation of Liability + + Under no circumstances and under no legal theory, whether tort (including + negligence), contract, or otherwise, shall any Contributor, or anyone who + distributes Covered Software as permitted above, be liable to You for any + direct, indirect, special, incidental, or consequential damages of any + character including, without limitation, damages for lost profits, loss of + goodwill, work stoppage, computer failure or malfunction, or any and all + other commercial damages or losses, even if such party shall have been + informed of the possibility of such damages. This limitation of liability + shall not apply to liability for death or personal injury resulting from + such party's negligence to the extent applicable law prohibits such + limitation. Some jurisdictions do not allow the exclusion or limitation of + incidental or consequential damages, so this exclusion and limitation may + not apply to You. + +8. Litigation + + Any litigation relating to this License may be brought only in the courts + of a jurisdiction where the defendant maintains its principal place of + business and such litigation shall be governed by laws of that + jurisdiction, without reference to its conflict-of-law provisions. Nothing + in this Section shall prevent a party's ability to bring cross-claims or + counter-claims. + +9. Miscellaneous + + This License represents the complete agreement concerning the subject + matter hereof. If any provision of this License is held to be + unenforceable, such provision shall be reformed only to the extent + necessary to make it enforceable. Any law or regulation which provides that + the language of a contract shall be construed against the drafter shall not + be used to construe this License against a Contributor. + + +10. Versions of the License + +10.1. New Versions + + Mozilla Foundation is the license steward. Except as provided in Section + 10.3, no one other than the license steward has the right to modify or + publish new versions of this License. Each version will be given a + distinguishing version number. + +10.2. Effect of New Versions + + You may distribute the Covered Software under the terms of the version + of the License under which You originally received the Covered Software, + or under the terms of any subsequent version published by the license + steward. + +10.3. Modified Versions + + If you create software not governed by this License, and you want to + create a new license for such software, you may create and use a + modified version of this License if you rename the license and remove + any references to the name of the license steward (except to note that + such modified license differs from this License). + +10.4. Distributing Source Code Form that is Incompatible With Secondary + Licenses If You choose to distribute Source Code Form that is + Incompatible With Secondary Licenses under the terms of this version of + the License, the notice described in Exhibit B of this License must be + attached. + +Exhibit A - Source Code Form License Notice + + This Source Code Form is subject to the + terms of the Mozilla Public License, v. + 2.0. If a copy of the MPL was not + distributed with this file, You can + obtain one at + http://mozilla.org/MPL/2.0/. + +If it is not possible or desirable to put the notice in a particular file, +then You may include the notice in a location (such as a LICENSE file in a +relevant directory) where a recipient would be likely to look for such a +notice. + +You may add additional accurate notices of copyright ownership. + +Exhibit B - "Incompatible With Secondary Licenses" Notice + + This Source Code Form is "Incompatible + With Secondary Licenses", as defined by + the Mozilla Public License, v. 2.0. diff --git a/vendor/github.com/hashicorp/golang-lru/simplelru/lru.go b/vendor/github.com/hashicorp/golang-lru/simplelru/lru.go new file mode 100644 index 000000000..5673773b2 --- /dev/null +++ b/vendor/github.com/hashicorp/golang-lru/simplelru/lru.go @@ -0,0 +1,161 @@ +package simplelru + +import ( + "container/list" + "errors" +) + +// EvictCallback is used to get a callback when a cache entry is evicted +type EvictCallback func(key interface{}, value interface{}) + +// LRU implements a non-thread safe fixed size LRU cache +type LRU struct { + size int + evictList *list.List + items map[interface{}]*list.Element + onEvict EvictCallback +} + +// entry is used to hold a value in the evictList +type entry struct { + key interface{} + value interface{} +} + +// NewLRU constructs an LRU of the given size +func NewLRU(size int, onEvict EvictCallback) (*LRU, error) { + if size <= 0 { + return nil, errors.New("Must provide a positive size") + } + c := &LRU{ + size: size, + evictList: list.New(), + items: make(map[interface{}]*list.Element), + onEvict: onEvict, + } + return c, nil +} + +// Purge is used to completely clear the cache. +func (c *LRU) Purge() { + for k, v := range c.items { + if c.onEvict != nil { + c.onEvict(k, v.Value.(*entry).value) + } + delete(c.items, k) + } + c.evictList.Init() +} + +// Add adds a value to the cache. Returns true if an eviction occurred. +func (c *LRU) Add(key, value interface{}) (evicted bool) { + // Check for existing item + if ent, ok := c.items[key]; ok { + c.evictList.MoveToFront(ent) + ent.Value.(*entry).value = value + return false + } + + // Add new item + ent := &entry{key, value} + entry := c.evictList.PushFront(ent) + c.items[key] = entry + + evict := c.evictList.Len() > c.size + // Verify size not exceeded + if evict { + c.removeOldest() + } + return evict +} + +// Get looks up a key's value from the cache. +func (c *LRU) Get(key interface{}) (value interface{}, ok bool) { + if ent, ok := c.items[key]; ok { + c.evictList.MoveToFront(ent) + return ent.Value.(*entry).value, true + } + return +} + +// Contains checks if a key is in the cache, without updating the recent-ness +// or deleting it for being stale. +func (c *LRU) Contains(key interface{}) (ok bool) { + _, ok = c.items[key] + return ok +} + +// Peek returns the key value (or undefined if not found) without updating +// the "recently used"-ness of the key. +func (c *LRU) Peek(key interface{}) (value interface{}, ok bool) { + var ent *list.Element + if ent, ok = c.items[key]; ok { + return ent.Value.(*entry).value, true + } + return nil, ok +} + +// Remove removes the provided key from the cache, returning if the +// key was contained. +func (c *LRU) Remove(key interface{}) (present bool) { + if ent, ok := c.items[key]; ok { + c.removeElement(ent) + return true + } + return false +} + +// RemoveOldest removes the oldest item from the cache. +func (c *LRU) RemoveOldest() (key interface{}, value interface{}, ok bool) { + ent := c.evictList.Back() + if ent != nil { + c.removeElement(ent) + kv := ent.Value.(*entry) + return kv.key, kv.value, true + } + return nil, nil, false +} + +// GetOldest returns the oldest entry +func (c *LRU) GetOldest() (key interface{}, value interface{}, ok bool) { + ent := c.evictList.Back() + if ent != nil { + kv := ent.Value.(*entry) + return kv.key, kv.value, true + } + return nil, nil, false +} + +// Keys returns a slice of the keys in the cache, from oldest to newest. +func (c *LRU) Keys() []interface{} { + keys := make([]interface{}, len(c.items)) + i := 0 + for ent := c.evictList.Back(); ent != nil; ent = ent.Prev() { + keys[i] = ent.Value.(*entry).key + i++ + } + return keys +} + +// Len returns the number of items in the cache. +func (c *LRU) Len() int { + return c.evictList.Len() +} + +// removeOldest removes the oldest item from the cache. +func (c *LRU) removeOldest() { + ent := c.evictList.Back() + if ent != nil { + c.removeElement(ent) + } +} + +// removeElement is used to remove a given list element from the cache +func (c *LRU) removeElement(e *list.Element) { + c.evictList.Remove(e) + kv := e.Value.(*entry) + delete(c.items, kv.key) + if c.onEvict != nil { + c.onEvict(kv.key, kv.value) + } +} diff --git a/vendor/github.com/hashicorp/golang-lru/simplelru/lru_interface.go b/vendor/github.com/hashicorp/golang-lru/simplelru/lru_interface.go new file mode 100644 index 000000000..74c707744 --- /dev/null +++ b/vendor/github.com/hashicorp/golang-lru/simplelru/lru_interface.go @@ -0,0 +1,36 @@ +package simplelru + +// LRUCache is the interface for simple LRU cache. +type LRUCache interface { + // Adds a value to the cache, returns true if an eviction occurred and + // updates the "recently used"-ness of the key. + Add(key, value interface{}) bool + + // Returns key's value from the cache and + // updates the "recently used"-ness of the key. #value, isFound + Get(key interface{}) (value interface{}, ok bool) + + // Check if a key exsists in cache without updating the recent-ness. + Contains(key interface{}) (ok bool) + + // Returns key's value without updating the "recently used"-ness of the key. + Peek(key interface{}) (value interface{}, ok bool) + + // Removes a key from the cache. + Remove(key interface{}) bool + + // Removes the oldest entry from cache. + RemoveOldest() (interface{}, interface{}, bool) + + // Returns the oldest entry from the cache. #key, value, isFound + GetOldest() (interface{}, interface{}, bool) + + // Returns a slice of the keys in the cache, from oldest to newest. + Keys() []interface{} + + // Returns the number of items in the cache. + Len() int + + // Clear all cache entries + Purge() +} diff --git a/vendor/modules.txt b/vendor/modules.txt index 436383773..aa1154aa9 100644 --- a/vendor/modules.txt +++ b/vendor/modules.txt @@ -32,6 +32,8 @@ github.com/google/go-cmp/cmp/cmpopts github.com/google/go-cmp/cmp/internal/diff github.com/google/go-cmp/cmp/internal/function github.com/google/go-cmp/cmp/internal/value +# github.com/hashicorp/golang-lru v0.5.0 +github.com/hashicorp/golang-lru/simplelru # github.com/inconshreveable/mousetrap v1.0.0 github.com/inconshreveable/mousetrap # github.com/juju/ratelimit v1.0.1