diff --git a/changelog/unreleased/pull-2195 b/changelog/unreleased/pull-2195 new file mode 100644 index 000000000..c2dac8bdf --- /dev/null +++ b/changelog/unreleased/pull-2195 @@ -0,0 +1,17 @@ +Enhancement: Simplify and improve restore performance + +Significantly improves restore performance of large files (i.e. 50M+): +https://github.com/restic/restic/issues/2074 +https://forum.restic.net/t/restore-using-rclone-gdrive-backend-is-slow/1112/8 +https://forum.restic.net/t/degraded-restore-performance-s3-backend/1400 + +Fixes "not enough cache capacity" error during restore: +https://github.com/restic/restic/issues/2244 + +NOTE: This new implementation does not guarantee order in which blobs +are written to the target files and, for example, the last blob of a +file can be written to the file before any of the preceeding file blobs. +It is therefore possible to have gaps in the data written to the target +files if restore fails or interrupted by the user. + +https://github.com/restic/restic/pull/2195 diff --git a/internal/restorer/doc.go b/internal/restorer/doc.go index b3583c728..5a4622ea6 100644 --- a/internal/restorer/doc.go +++ b/internal/restorer/doc.go @@ -5,29 +5,20 @@ // request and avoiding repeated downloads of the same pack. In addition, // several pack files are fetched concurrently. // -// Here is high-level pseudo-code of the how the Restorer attempts to achieve +// Here is high-level pseudo-code of how the Restorer attempts to achieve // these goals: // // while there are packs to process // choose a pack to process [1] -// get the pack from the backend or cache [2] +// retrieve the pack from the backend [2] // write pack blobs to the files that need them [3] -// if not all pack blobs were used -// cache the pack for future use [4] // -// Pack download and processing (steps [2] - [4]) runs on multiple concurrent -// Goroutines. The Restorer runs all steps [2]-[4] sequentially on the same -// Goroutine. +// Retrieval of repository packs (step [2]) and writing target files (step [3]) +// are performed concurrently on multiple goroutines. // -// Before a pack is downloaded (step [2]), the required space is "reserved" in -// the pack cache. Actual download uses single backend request to get all -// required pack blobs. This may download blobs that are not needed, but we -// assume it'll still be faster than getting individual blobs. -// -// Target files are written (step [3]) in the "right" order, first file blob -// first, then second, then third and so on. Blob write order implies that some -// pack blobs may not be immediately used, i.e. they are "out of order" for -// their respective target files. Packs with unused blobs are cached (step -// [4]). The cache has capacity limit and may purge packs before they are fully -// used, in which case the purged packs will need to be re-downloaded. +// Implementation does not guarantee order in which blobs are written to the +// target files and, for example, the last blob of a file can be written to the +// file before any of the preceeding file blobs. It is therefore possible to +// have gaps in the data written to the target files if restore fails or +// interrupted by the user. package restorer diff --git a/internal/restorer/filepacktraverser.go b/internal/restorer/filepacktraverser.go deleted file mode 100644 index bba61e0f9..000000000 --- a/internal/restorer/filepacktraverser.go +++ /dev/null @@ -1,52 +0,0 @@ -package restorer - -import ( - "github.com/restic/restic/internal/errors" - "github.com/restic/restic/internal/restic" -) - -type filePackTraverser struct { - lookup func(restic.ID, restic.BlobType) ([]restic.PackedBlob, bool) -} - -// iterates over all remaining packs of the file -func (t *filePackTraverser) forEachFilePack(file *fileInfo, fn func(packIdx int, packID restic.ID, packBlobs []restic.Blob) bool) error { - if len(file.blobs) == 0 { - return nil - } - - getBlobPack := func(blobID restic.ID) (restic.PackedBlob, error) { - packs, found := t.lookup(blobID, restic.DataBlob) - if !found { - return restic.PackedBlob{}, errors.Errorf("Unknown blob %s", blobID.String()) - } - // TODO which pack to use if multiple packs have the blob? - // MUST return the same pack for the same blob during the same execution - return packs[0], nil - } - - var prevPackID restic.ID - var prevPackBlobs []restic.Blob - packIdx := 0 - for _, blobID := range file.blobs { - packedBlob, err := getBlobPack(blobID) - if err != nil { - return err - } - if !prevPackID.IsNull() && prevPackID != packedBlob.PackID { - if !fn(packIdx, prevPackID, prevPackBlobs) { - return nil - } - packIdx++ - } - if prevPackID != packedBlob.PackID { - prevPackID = packedBlob.PackID - prevPackBlobs = make([]restic.Blob, 0) - } - prevPackBlobs = append(prevPackBlobs, packedBlob.Blob) - } - if len(prevPackBlobs) > 0 { - fn(packIdx, prevPackID, prevPackBlobs) - } - return nil -} diff --git a/internal/restorer/filerestorer.go b/internal/restorer/filerestorer.go index c59b061b0..f4826e586 100644 --- a/internal/restorer/filerestorer.go +++ b/internal/restorer/filerestorer.go @@ -1,9 +1,12 @@ package restorer import ( + "bytes" "context" "io" + "math" "path/filepath" + "sync" "github.com/restic/restic/internal/crypto" "github.com/restic/restic/internal/debug" @@ -15,66 +18,58 @@ import ( // TODO evaluate if it makes sense to split download and processing workers // pro: can (slowly) read network and decrypt/write files concurrently // con: each worker needs to keep one pack in memory -// TODO evaluate memory footprint for larger repositories, say 10M packs/10M files -// TODO consider replacing pack file cache with blob cache -// TODO avoid decrypting the same blob multiple times -// TODO evaluate disabled debug logging overhead for large repositories const ( workerCount = 8 - // max number of cached open output file handles - filesWriterCacheCap = 32 + // fileInfo flags + fileProgress = 1 + fileError = 2 - // estimated average pack size used to calculate pack cache capacity - averagePackSize = 5 * 1024 * 1024 - - // pack cache capacity should support at least one cached pack per worker - // allow space for extra 5 packs for actual caching - packCacheCapacity = (workerCount + 5) * averagePackSize + largeFileBlobCount = 25 ) // information about regular file being restored type fileInfo struct { + lock sync.Mutex + flags int location string // file on local filesystem relative to restorer basedir - blobs []restic.ID // remaining blobs of the file + blobs interface{} // blobs of the file +} + +type fileBlobInfo struct { + id restic.ID // the blob id + offset int64 // blob offset in the file } // information about a data pack required to restore one or more files type packInfo struct { - // the pack id - id restic.ID - - // set of files that use blobs from this pack - files map[*fileInfo]struct{} - - // number of other packs that must be downloaded before all blobs in this pack can be used - cost int - - // used by packHeap - index int + id restic.ID // the pack id + files map[*fileInfo]struct{} // set of files that use blobs from this pack } // fileRestorer restores set of files type fileRestorer struct { key *crypto.Key - idx filePackTraverser + idx func(restic.ID, restic.BlobType) ([]restic.PackedBlob, bool) packLoader func(ctx context.Context, h restic.Handle, length int, offset int64, fn func(rd io.Reader) error) error - packCache *packCache // pack cache - filesWriter *filesWriter // file write + filesWriter *filesWriter dst string files []*fileInfo } -func newFileRestorer(dst string, packLoader func(ctx context.Context, h restic.Handle, length int, offset int64, fn func(rd io.Reader) error) error, key *crypto.Key, idx filePackTraverser) *fileRestorer { +func newFileRestorer(dst string, + packLoader func(ctx context.Context, h restic.Handle, length int, offset int64, fn func(rd io.Reader) error) error, + key *crypto.Key, + idx func(restic.ID, restic.BlobType) ([]restic.PackedBlob, bool)) *fileRestorer { + return &fileRestorer{ - packLoader: packLoader, key: key, idx: idx, - filesWriter: newFilesWriter(filesWriterCacheCap), - packCache: newPackCache(packCacheCapacity), + packLoader: packLoader, + filesWriter: newFilesWriter(workerCount), dst: dst, } } @@ -87,237 +82,237 @@ func (r *fileRestorer) targetPath(location string) string { return filepath.Join(r.dst, location) } -// used to pass information among workers (wish golang channels allowed multivalues) -type processingInfo struct { - pack *packInfo - files map[*fileInfo]error -} - -func (r *fileRestorer) restoreFiles(ctx context.Context, onError func(path string, err error)) error { - // TODO conditionally enable when debug log is on - // for _, file := range r.files { - // dbgmsg := file.location + ": " - // r.idx.forEachFilePack(file, func(packIdx int, packID restic.ID, packBlobs []restic.Blob) bool { - // if packIdx > 0 { - // dbgmsg += ", " - // } - // dbgmsg += "pack{id=" + packID.Str() + ", blobs: " - // for blobIdx, blob := range packBlobs { - // if blobIdx > 0 { - // dbgmsg += ", " - // } - // dbgmsg += blob.ID.Str() - // } - // dbgmsg += "}" - // return true // keep going - // }) - // debug.Log(dbgmsg) - // } - - inprogress := make(map[*fileInfo]struct{}) - queue, err := newPackQueue(r.idx, r.files, func(files map[*fileInfo]struct{}) bool { - for file := range files { - if _, found := inprogress[file]; found { - return true - } - } - return false - }) - if err != nil { - return err +func (r *fileRestorer) forEachBlob(blobIDs []restic.ID, fn func(packID restic.ID, packBlob restic.Blob)) error { + if len(blobIDs) == 0 { + return nil } - // workers - downloadCh := make(chan processingInfo) - feedbackCh := make(chan processingInfo) - - defer close(downloadCh) - defer close(feedbackCh) - - worker := func() { - for { - select { - case <-ctx.Done(): - return - case request, ok := <-downloadCh: - if !ok { - return // channel closed - } - rd, err := r.downloadPack(ctx, request.pack) - if err == nil { - r.processPack(ctx, request, rd) - } else { - // mark all files as failed - for file := range request.files { - request.files[file] = err - } - } - feedbackCh <- request - } - } - } - for i := 0; i < workerCount; i++ { - go worker() - } - - processFeedback := func(pack *packInfo, ferrors map[*fileInfo]error) { - // update files blobIdx - // must do it here to avoid race among worker and processing feedback threads - var success []*fileInfo - var failure []*fileInfo - for file, ferr := range ferrors { - target := r.targetPath(file.location) - if ferr != nil { - onError(file.location, ferr) - r.filesWriter.close(target) - delete(inprogress, file) - failure = append(failure, file) - } else { - r.idx.forEachFilePack(file, func(packIdx int, packID restic.ID, packBlobs []restic.Blob) bool { - file.blobs = file.blobs[len(packBlobs):] - return false // only interesed in the first pack - }) - if len(file.blobs) == 0 { - r.filesWriter.close(target) - delete(inprogress, file) - } - success = append(success, file) - } - } - // update the queue and requeueu the pack as necessary - if !queue.requeuePack(pack, success, failure) { - r.packCache.remove(pack.id) - debug.Log("Purged used up pack %s from pack cache", pack.id.Str()) - } - } - - // the main restore loop - for !queue.isEmpty() { - debug.Log("-----------------------------------") - pack, files := queue.nextPack() - if pack != nil { - ferrors := make(map[*fileInfo]error) - for _, file := range files { - ferrors[file] = nil - inprogress[file] = struct{}{} - } - select { - case <-ctx.Done(): - return ctx.Err() - case downloadCh <- processingInfo{pack: pack, files: ferrors}: - debug.Log("Scheduled download pack %s (%d files)", pack.id.Str(), len(files)) - case feedback := <-feedbackCh: - queue.requeuePack(pack, []*fileInfo{}, []*fileInfo{}) // didn't use the pack during this iteration - processFeedback(feedback.pack, feedback.files) - } - } else { - select { - case <-ctx.Done(): - return ctx.Err() - case feedback := <-feedbackCh: - processFeedback(feedback.pack, feedback.files) - } + for _, blobID := range blobIDs { + packs, found := r.idx(blobID, restic.DataBlob) + if !found { + return errors.Errorf("Unknown blob %s", blobID.String()) } + fn(packs[0].PackID, packs[0].Blob) } return nil } -func (r *fileRestorer) downloadPack(ctx context.Context, pack *packInfo) (readerAtCloser, error) { - const MaxInt64 = 1<<63 - 1 // odd Go does not have this predefined somewhere +func (r *fileRestorer) restoreFiles(ctx context.Context) error { - // calculate pack byte range - start, end := int64(MaxInt64), int64(0) - for file := range pack.files { - r.idx.forEachFilePack(file, func(packIdx int, packID restic.ID, packBlobs []restic.Blob) bool { - if packID.Equal(pack.id) { - for _, blob := range packBlobs { - if start > int64(blob.Offset) { - start = int64(blob.Offset) - } - if end < int64(blob.Offset+blob.Length) { - end = int64(blob.Offset + blob.Length) - } - } + packs := make(map[restic.ID]*packInfo) // all packs + + // create packInfo from fileInfo + for _, file := range r.files { + fileBlobs := file.blobs.(restic.IDs) + largeFile := len(fileBlobs) > largeFileBlobCount + var packsMap map[restic.ID][]fileBlobInfo + if largeFile { + packsMap = make(map[restic.ID][]fileBlobInfo) + } + fileOffset := int64(0) + err := r.forEachBlob(fileBlobs, func(packID restic.ID, blob restic.Blob) { + if largeFile { + packsMap[packID] = append(packsMap[packID], fileBlobInfo{id: blob.ID, offset: fileOffset}) + fileOffset += int64(blob.Length) - crypto.Extension } - - return true // keep going + pack, ok := packs[packID] + if !ok { + pack = &packInfo{ + id: packID, + files: make(map[*fileInfo]struct{}), + } + packs[packID] = pack + } + pack.files[file] = struct{}{} }) + if err != nil { + // repository index is messed up, can't do anything + return err + } + if largeFile { + file.blobs = packsMap + } } - packReader, err := r.packCache.get(pack.id, start, int(end-start), func(offset int64, length int, wr io.WriteSeeker) error { - h := restic.Handle{Type: restic.DataFile, Name: pack.id.String()} - return r.packLoader(ctx, h, length, offset, func(rd io.Reader) error { - // reset the file in case of a download retry - _, err := wr.Seek(0, io.SeekStart) - if err != nil { - return err + var wg sync.WaitGroup + downloadCh := make(chan *packInfo) + worker := func() { + defer wg.Done() + for { + select { + case <-ctx.Done(): + return // context cancelled + case pack, ok := <-downloadCh: + if !ok { + return // channel closed + } + r.downloadPack(ctx, pack) } + } + } + for i := 0; i < workerCount; i++ { + go worker() + wg.Add(1) + } - len, err := io.Copy(wr, rd) - if err != nil { - return err - } - if len != int64(length) { - return errors.Errorf("unexpected pack size: expected %d but got %d", length, len) - } + // the main restore loop + for _, pack := range packs { + select { + case <-ctx.Done(): + return ctx.Err() + case downloadCh <- pack: + debug.Log("Scheduled download pack %s", pack.id.Str()) + } + } - return nil - }) + close(downloadCh) + wg.Wait() + + return nil +} + +func (r *fileRestorer) downloadPack(ctx context.Context, pack *packInfo) { + + // calculate pack byte range and blob->[]files->[]offsets mappings + start, end := int64(math.MaxInt64), int64(0) + blobs := make(map[restic.ID]struct { + offset int64 // offset of the blob in the pack + length int // length of the blob + files map[*fileInfo][]int64 // file -> offsets (plural!) of the blob in the file }) - if err != nil { - return nil, err - } - - return packReader, nil -} - -func (r *fileRestorer) processPack(ctx context.Context, request processingInfo, rd readerAtCloser) { - defer rd.Close() - - for file := range request.files { - target := r.targetPath(file.location) - r.idx.forEachFilePack(file, func(packIdx int, packID restic.ID, packBlobs []restic.Blob) bool { - for _, blob := range packBlobs { - debug.Log("Writing blob %s (%d bytes) from pack %s to %s", blob.ID.Str(), blob.Length, packID.Str(), file.location) - buf, err := r.loadBlob(rd, blob) - if err == nil { - err = r.filesWriter.writeToFile(target, buf) + for file := range pack.files { + addBlob := func(blob restic.Blob, fileOffset int64) { + if start > int64(blob.Offset) { + start = int64(blob.Offset) + } + if end < int64(blob.Offset+blob.Length) { + end = int64(blob.Offset + blob.Length) + } + blobInfo, ok := blobs[blob.ID] + if !ok { + blobInfo.offset = int64(blob.Offset) + blobInfo.length = int(blob.Length) + blobInfo.files = make(map[*fileInfo][]int64) + blobs[blob.ID] = blobInfo + } + blobInfo.files[file] = append(blobInfo.files[file], fileOffset) + } + if fileBlobs, ok := file.blobs.(restic.IDs); ok { + fileOffset := int64(0) + r.forEachBlob(fileBlobs, func(packID restic.ID, blob restic.Blob) { + if packID.Equal(pack.id) { + addBlob(blob, fileOffset) } - if err != nil { - request.files[file] = err - break // could not restore the file + fileOffset += int64(blob.Length) - crypto.Extension + }) + } else if packsMap, ok := file.blobs.(map[restic.ID][]fileBlobInfo); ok { + for _, blob := range packsMap[pack.id] { + idxPacks, found := r.idx(blob.id, restic.DataBlob) + if found { + for _, idxPack := range idxPacks { + if idxPack.PackID.Equal(pack.id) { + addBlob(idxPack.Blob, blob.offset) + break + } + } } } - return false - }) + } + } + + packData := make([]byte, int(end-start)) + + h := restic.Handle{Type: restic.DataFile, Name: pack.id.String()} + err := r.packLoader(ctx, h, int(end-start), start, func(rd io.Reader) error { + l, err := io.ReadFull(rd, packData) + if err != nil { + return err + } + if l != len(packData) { + return errors.Errorf("unexpected pack size: expected %d but got %d", len(packData), l) + } + return nil + }) + + markFileError := func(file *fileInfo, err error) { + file.lock.Lock() + defer file.lock.Unlock() + if file.flags&fileError == 0 { + file.flags |= fileError + } + } + + if err != nil { + for file := range pack.files { + markFileError(file, err) + } + return + } + + rd := bytes.NewReader(packData) + + for blobID, blob := range blobs { + blobData, err := r.loadBlob(rd, blobID, blob.offset-start, blob.length) + if err != nil { + for file := range blob.files { + markFileError(file, err) + } + continue + } + for file, offsets := range blob.files { + for _, offset := range offsets { + writeToFile := func() error { + // this looks overly complicated and needs explanation + // two competing requirements: + // - must create the file once and only once + // - should allow concurrent writes to the file + // so write the first blob while holding file lock + // write other blobs after releasing the lock + file.lock.Lock() + create := file.flags&fileProgress == 0 + if create { + defer file.lock.Unlock() + file.flags |= fileProgress + } else { + file.lock.Unlock() + } + return r.filesWriter.writeToFile(r.targetPath(file.location), blobData, offset, create) + } + err := writeToFile() + if err != nil { + markFileError(file, err) + break + } + } + } } } -func (r *fileRestorer) loadBlob(rd io.ReaderAt, blob restic.Blob) ([]byte, error) { +func (r *fileRestorer) loadBlob(rd io.ReaderAt, blobID restic.ID, offset int64, length int) ([]byte, error) { // TODO reconcile with Repository#loadBlob implementation - buf := make([]byte, blob.Length) + buf := make([]byte, length) - n, err := rd.ReadAt(buf, int64(blob.Offset)) + n, err := rd.ReadAt(buf, offset) if err != nil { return nil, err } - if n != int(blob.Length) { - return nil, errors.Errorf("error loading blob %v: wrong length returned, want %d, got %d", blob.ID.Str(), blob.Length, n) + if n != length { + return nil, errors.Errorf("error loading blob %v: wrong length returned, want %d, got %d", blobID.Str(), length, n) } // decrypt nonce, ciphertext := buf[:r.key.NonceSize()], buf[r.key.NonceSize():] plaintext, err := r.key.Open(ciphertext[:0], nonce, ciphertext, nil) if err != nil { - return nil, errors.Errorf("decrypting blob %v failed: %v", blob.ID, err) + return nil, errors.Errorf("decrypting blob %v failed: %v", blobID, err) } // check hash - if !restic.Hash(plaintext).Equal(blob.ID) { - return nil, errors.Errorf("blob %v returned invalid hash", blob.ID) + if !restic.Hash(plaintext).Equal(blobID) { + return nil, errors.Errorf("blob %v returned invalid hash", blobID) } return plaintext, nil diff --git a/internal/restorer/filerestorer_test.go b/internal/restorer/filerestorer_test.go index db683b10e..9d362b13c 100644 --- a/internal/restorer/filerestorer_test.go +++ b/internal/restorer/filerestorer_test.go @@ -8,7 +8,6 @@ import ( "testing" "github.com/restic/restic/internal/crypto" - "github.com/restic/restic/internal/errors" "github.com/restic/restic/internal/restic" rtest "github.com/restic/restic/internal/test" ) @@ -38,9 +37,6 @@ type TestRepo struct { // loader func(ctx context.Context, h restic.Handle, length int, offset int64, fn func(rd io.Reader) error) error - - // - idx filePackTraverser } func (i *TestRepo) Lookup(blobID restic.ID, _ restic.BlobType) ([]restic.PackedBlob, bool) { @@ -56,11 +52,6 @@ func (i *TestRepo) packID(name string) restic.ID { return i.packsNameToID[name] } -func (i *TestRepo) pack(queue *packQueue, name string) *packInfo { - id := i.packsNameToID[name] - return queue.packs[id] -} - func (i *TestRepo) fileContent(file *fileInfo) string { return i.filesPathToContent[file.location] } @@ -147,7 +138,6 @@ func newTestRepo(content []TestFile) *TestRepo { files: files, filesPathToContent: filesPathToContent, } - repo.idx = filePackTraverser{lookup: repo.Lookup} repo.loader = func(ctx context.Context, h restic.Handle, length int, offset int64, fn func(rd io.Reader) error) error { packID, err := restic.ParseID(h.Name) if err != nil { @@ -163,12 +153,11 @@ func newTestRepo(content []TestFile) *TestRepo { func restoreAndVerify(t *testing.T, tempdir string, content []TestFile) { repo := newTestRepo(content) - r := newFileRestorer(tempdir, repo.loader, repo.key, repo.idx) + r := newFileRestorer(tempdir, repo.loader, repo.key, repo.Lookup) r.files = repo.files - r.restoreFiles(context.TODO(), func(path string, err error) { - rtest.OK(t, errors.Wrapf(err, "unexpected error")) - }) + err := r.restoreFiles(context.TODO()) + rtest.OK(t, err) for _, file := range repo.files { target := r.targetPath(file.location) @@ -178,16 +167,11 @@ func restoreAndVerify(t *testing.T, tempdir string, content []TestFile) { continue } - _, contains := r.filesWriter.cache[target] - rtest.Equals(t, false, contains) - content := repo.fileContent(file) if !bytes.Equal(data, []byte(content)) { t.Errorf("file %v has wrong content: want %q, got %q", file.location, content, data) } } - - rtest.OK(t, nil) } func TestFileRestorerBasic(t *testing.T) { @@ -209,5 +193,13 @@ func TestFileRestorerBasic(t *testing.T) { TestBlob{"data2-2", "pack2-2"}, }, }, + TestFile{ + name: "file3", + blobs: []TestBlob{ + // same blob multiple times + TestBlob{"data3-1", "pack3-1"}, + TestBlob{"data3-1", "pack3-1"}, + }, + }, }) } diff --git a/internal/restorer/fileswriter.go b/internal/restorer/fileswriter.go index b3beb8161..8d632cd09 100644 --- a/internal/restorer/fileswriter.go +++ b/internal/restorer/fileswriter.go @@ -4,98 +4,89 @@ import ( "os" "sync" - "github.com/restic/restic/internal/debug" - "github.com/restic/restic/internal/errors" + "github.com/cespare/xxhash" ) -// Writes blobs to output files. Each file is written sequentially, -// start to finish, but multiple files can be written to concurrently. -// Implementation allows virtually unlimited number of logically open -// files, but number of phisically open files will never exceed number -// of concurrent writeToFile invocations plus cacheCap. +// writes blobs to target files. +// multiple files can be written to concurrently. +// multiple blobs can be concurrently written to the same file. +// TODO I am not 100% convinced this is necessary, i.e. it may be okay +// to use multiple os.File to write to the same target file type filesWriter struct { - lock sync.Mutex // guards concurrent access to open files cache - inprogress map[string]struct{} // (logically) opened file writers - cache map[string]*os.File // cache of open files - cacheCap int // max number of cached open files + buckets []filesWriterBucket } -func newFilesWriter(cacheCap int) *filesWriter { +type filesWriterBucket struct { + lock sync.Mutex + files map[string]*os.File + users map[string]int +} + +func newFilesWriter(count int) *filesWriter { + buckets := make([]filesWriterBucket, count) + for b := 0; b < count; b++ { + buckets[b].files = make(map[string]*os.File) + buckets[b].users = make(map[string]int) + } return &filesWriter{ - inprogress: make(map[string]struct{}), - cache: make(map[string]*os.File), - cacheCap: cacheCap, + buckets: buckets, } } -func (w *filesWriter) writeToFile(path string, blob []byte) error { - // First writeToFile invocation for any given path will: - // - create and open the file - // - write the blob to the file - // - cache the open file if there is space, close the file otherwise - // Subsequent invocations will: - // - remove the open file from the cache _or_ open the file for append - // - write the blob to the file - // - cache the open file if there is space, close the file otherwise - // The idea is to cap maximum number of open files with minimal - // coordination among concurrent writeToFile invocations (note that - // writeToFile never touches somebody else's open file). +func (w *filesWriter) writeToFile(path string, blob []byte, offset int64, create bool) error { + bucket := &w.buckets[uint(xxhash.Sum64String(path))%uint(len(w.buckets))] - // TODO measure if caching is useful (likely depends on operating system - // and hardware configuration) acquireWriter := func() (*os.File, error) { - w.lock.Lock() - defer w.lock.Unlock() - if wr, ok := w.cache[path]; ok { - debug.Log("Used cached writer for %s", path) - delete(w.cache, path) + bucket.lock.Lock() + defer bucket.lock.Unlock() + + if wr, ok := bucket.files[path]; ok { + bucket.users[path]++ return wr, nil } + var flags int - if _, append := w.inprogress[path]; append { - flags = os.O_APPEND | os.O_WRONLY - } else { - w.inprogress[path] = struct{}{} + if create { flags = os.O_CREATE | os.O_TRUNC | os.O_WRONLY + } else { + flags = os.O_WRONLY } + wr, err := os.OpenFile(path, flags, 0600) if err != nil { return nil, err } - debug.Log("Opened writer for %s", path) + + bucket.files[path] = wr + bucket.users[path] = 1 + return wr, nil } - cacheOrCloseWriter := func(wr *os.File) { - w.lock.Lock() - defer w.lock.Unlock() - if len(w.cache) < w.cacheCap { - w.cache[path] = wr - } else { - wr.Close() + + releaseWriter := func(wr *os.File) error { + bucket.lock.Lock() + defer bucket.lock.Unlock() + + if bucket.users[path] == 1 { + delete(bucket.files, path) + delete(bucket.users, path) + return wr.Close() } + bucket.users[path]-- + return nil } wr, err := acquireWriter() if err != nil { return err } - n, err := wr.Write(blob) - cacheOrCloseWriter(wr) + + _, err = wr.WriteAt(blob, offset) + if err != nil { + releaseWriter(wr) return err } - if n != len(blob) { - return errors.Errorf("error writing file %v: wrong length written, want %d, got %d", path, len(blob), n) - } - return nil -} -func (w *filesWriter) close(path string) { - w.lock.Lock() - defer w.lock.Unlock() - if wr, ok := w.cache[path]; ok { - wr.Close() - delete(w.cache, path) - } - delete(w.inprogress, path) + return releaseWriter(wr) } diff --git a/internal/restorer/fileswriter_test.go b/internal/restorer/fileswriter_test.go index ada7f2107..690826534 100644 --- a/internal/restorer/fileswriter_test.go +++ b/internal/restorer/fileswriter_test.go @@ -16,23 +16,21 @@ func TestFilesWriterBasic(t *testing.T) { f1 := dir + "/f1" f2 := dir + "/f2" - rtest.OK(t, w.writeToFile(f1, []byte{1})) - rtest.Equals(t, 1, len(w.cache)) - rtest.Equals(t, 1, len(w.inprogress)) + rtest.OK(t, w.writeToFile(f1, []byte{1}, 0, true)) + rtest.Equals(t, 0, len(w.buckets[0].files)) + rtest.Equals(t, 0, len(w.buckets[0].users)) - rtest.OK(t, w.writeToFile(f2, []byte{2})) - rtest.Equals(t, 1, len(w.cache)) - rtest.Equals(t, 2, len(w.inprogress)) + rtest.OK(t, w.writeToFile(f2, []byte{2}, 0, true)) + rtest.Equals(t, 0, len(w.buckets[0].files)) + rtest.Equals(t, 0, len(w.buckets[0].users)) - rtest.OK(t, w.writeToFile(f1, []byte{1})) - w.close(f1) - rtest.Equals(t, 0, len(w.cache)) - rtest.Equals(t, 1, len(w.inprogress)) + rtest.OK(t, w.writeToFile(f1, []byte{1}, 1, false)) + rtest.Equals(t, 0, len(w.buckets[0].files)) + rtest.Equals(t, 0, len(w.buckets[0].users)) - rtest.OK(t, w.writeToFile(f2, []byte{2})) - w.close(f2) - rtest.Equals(t, 0, len(w.cache)) - rtest.Equals(t, 0, len(w.inprogress)) + rtest.OK(t, w.writeToFile(f2, []byte{2}, 1, false)) + rtest.Equals(t, 0, len(w.buckets[0].files)) + rtest.Equals(t, 0, len(w.buckets[0].users)) buf, err := ioutil.ReadFile(f1) rtest.OK(t, err) diff --git a/internal/restorer/packcache.go b/internal/restorer/packcache.go deleted file mode 100644 index 1eaad63bf..000000000 --- a/internal/restorer/packcache.go +++ /dev/null @@ -1,243 +0,0 @@ -package restorer - -import ( - "io" - "sync" - - "github.com/restic/restic/internal/debug" - "github.com/restic/restic/internal/errors" - "github.com/restic/restic/internal/restic" -) - -// packCache is thread safe in-memory cache of pack files required to restore -// one or more files. The cache is meant to hold pack files that cannot be -// fully used right away. This happens when pack files contains blobs from -// "head" of some files and "middle" of other files. "Middle" blobs cannot be -// written to their files until after blobs from some other packs are written -// to the files first. -// -// While the cache is thread safe, implementation assumes (and enforces) -// that individual entries are used by one client at a time. Clients must -// #Close() entry's reader to make the entry available for use by other -// clients. This limitation can be relaxed in the future if necessary. -type packCache struct { - // guards access to cache internal data structures - lock sync.Mutex - - // cache capacity - capacity int - reservedCapacity int - allocatedCapacity int - - // pack records currently being used by active restore worker - reservedPacks map[restic.ID]*packCacheRecord - - // unused allocated packs, can be deleted if necessary - cachedPacks map[restic.ID]*packCacheRecord -} - -type packCacheRecord struct { - master *packCacheRecord - cache *packCache - - id restic.ID // cached pack id - offset int64 // cached pack byte range - - data []byte -} - -type readerAtCloser interface { - io.Closer - io.ReaderAt -} - -type bytesWriteSeeker struct { - pos int - data []byte -} - -func (wr *bytesWriteSeeker) Write(p []byte) (n int, err error) { - if wr.pos+len(p) > len(wr.data) { - return -1, errors.Errorf("not enough space") - } - n = copy(wr.data[wr.pos:], p) - wr.pos += n - return n, nil -} - -func (wr *bytesWriteSeeker) Seek(offset int64, whence int) (int64, error) { - if offset != 0 || whence != io.SeekStart { - return -1, errors.Errorf("unsupported seek request") - } - wr.pos = 0 - return 0, nil -} - -func newPackCache(capacity int) *packCache { - return &packCache{ - capacity: capacity, - reservedPacks: make(map[restic.ID]*packCacheRecord), - cachedPacks: make(map[restic.ID]*packCacheRecord), - } -} - -func (c *packCache) reserve(packID restic.ID, offset int64, length int) (record *packCacheRecord, err error) { - c.lock.Lock() - defer c.lock.Unlock() - - if offset < 0 || length <= 0 { - return nil, errors.Errorf("illegal pack cache allocation range %s {offset: %d, length: %d}", packID.Str(), offset, length) - } - - if c.reservedCapacity+length > c.capacity { - return nil, errors.Errorf("not enough cache capacity: requested %d, available %d", length, c.capacity-c.reservedCapacity) - } - - if _, ok := c.reservedPacks[packID]; ok { - return nil, errors.Errorf("pack is already reserved %s", packID.Str()) - } - - // the pack is available in the cache and currently unused - if pack, ok := c.cachedPacks[packID]; ok { - // check if cached pack includes requested byte range - // the range can shrink, but it never grows bigger unless there is a bug elsewhere - if pack.offset > offset || (pack.offset+int64(len(pack.data))) < (offset+int64(length)) { - return nil, errors.Errorf("cached range %d-%d is smaller than requested range %d-%d for pack %s", pack.offset, pack.offset+int64(len(pack.data)), length, offset+int64(length), packID.Str()) - } - - // move the pack to the used map - delete(c.cachedPacks, packID) - c.reservedPacks[packID] = pack - c.reservedCapacity += len(pack.data) - - debug.Log("Using cached pack %s (%d bytes)", pack.id.Str(), len(pack.data)) - - if pack.offset != offset || len(pack.data) != length { - // restrict returned record to requested range - return &packCacheRecord{ - cache: c, - master: pack, - offset: offset, - data: pack.data[int(offset-pack.offset) : int(offset-pack.offset)+length], - }, nil - } - - return pack, nil - } - - for c.allocatedCapacity+length > c.capacity { - // all cached packs will be needed at some point - // so it does not matter which one to purge - for _, cached := range c.cachedPacks { - delete(c.cachedPacks, cached.id) - c.allocatedCapacity -= len(cached.data) - debug.Log("dropped cached pack %s (%d bytes)", cached.id.Str(), len(cached.data)) - break - } - } - - pack := &packCacheRecord{ - cache: c, - id: packID, - offset: offset, - } - c.reservedPacks[pack.id] = pack - c.allocatedCapacity += length - c.reservedCapacity += length - - return pack, nil -} - -// get returns reader of the specified cached pack. Uses provided load func -// to download pack content if necessary. -// The returned reader is only able to read pack within byte range specified -// by offset and length parameters, attempts to read outside that range will -// result in an error. -// The returned reader must be closed before the same packID can be requested -// from the cache again. -func (c *packCache) get(packID restic.ID, offset int64, length int, load func(offset int64, length int, wr io.WriteSeeker) error) (readerAtCloser, error) { - pack, err := c.reserve(packID, offset, length) - if err != nil { - return nil, err - } - - if pack.data == nil { - releasePack := func() { - delete(c.reservedPacks, pack.id) - c.reservedCapacity -= length - c.allocatedCapacity -= length - } - wr := &bytesWriteSeeker{data: make([]byte, length)} - err = load(offset, length, wr) - if err != nil { - releasePack() - return nil, err - } - if wr.pos != length { - releasePack() - return nil, errors.Errorf("invalid read size") - } - pack.data = wr.data - debug.Log("Downloaded and cached pack %s (%d bytes)", pack.id.Str(), len(pack.data)) - } - - return pack, nil -} - -// releases the pack record back to the cache -func (c *packCache) release(pack *packCacheRecord) error { - c.lock.Lock() - defer c.lock.Unlock() - - if _, ok := c.reservedPacks[pack.id]; !ok { - return errors.Errorf("invalid pack release request") - } - - delete(c.reservedPacks, pack.id) - c.cachedPacks[pack.id] = pack - c.reservedCapacity -= len(pack.data) - - return nil -} - -// remove removes specified pack from the cache and frees -// corresponding cache space. should be called after the pack -// was fully used up by the restorer. -func (c *packCache) remove(packID restic.ID) error { - c.lock.Lock() - defer c.lock.Unlock() - - if _, ok := c.reservedPacks[packID]; ok { - return errors.Errorf("invalid pack remove request, pack %s is reserved", packID.Str()) - } - - pack, ok := c.cachedPacks[packID] - if !ok { - return errors.Errorf("invalid pack remove request, pack %s is not cached", packID.Str()) - } - - delete(c.cachedPacks, pack.id) - c.allocatedCapacity -= len(pack.data) - - return nil -} - -// ReadAt reads len(b) bytes from the pack starting at byte offset off. -// It returns the number of bytes read and the error, if any. -func (r *packCacheRecord) ReadAt(b []byte, off int64) (n int, err error) { - if off < r.offset || off+int64(len(b)) > r.offset+int64(len(r.data)) { - return -1, errors.Errorf("read outside available range") - } - return copy(b, r.data[off-r.offset:]), nil -} - -// Close closes the pack reader and releases corresponding cache record -// to the cache. Once closed, the record can be reused by subsequent -// requests for the same packID or it can be purged from the cache to make -// room for other packs -func (r *packCacheRecord) Close() (err error) { - if r.master != nil { - return r.cache.release(r.master) - } - return r.cache.release(r) -} diff --git a/internal/restorer/packcache_test.go b/internal/restorer/packcache_test.go deleted file mode 100644 index 3a5f18cf5..000000000 --- a/internal/restorer/packcache_test.go +++ /dev/null @@ -1,305 +0,0 @@ -package restorer - -import ( - "io" - "testing" - - "github.com/restic/restic/internal/errors" - "github.com/restic/restic/internal/restic" - rtest "github.com/restic/restic/internal/test" -) - -func assertNotOK(t *testing.T, msg string, err error) { - rtest.Assert(t, err != nil, msg+" did not fail") -} - -func TestBytesWriterSeeker(t *testing.T) { - wr := &bytesWriteSeeker{data: make([]byte, 10)} - - n, err := wr.Write([]byte{1, 2}) - rtest.OK(t, err) - rtest.Equals(t, 2, n) - rtest.Equals(t, []byte{1, 2}, wr.data[0:2]) - - n64, err := wr.Seek(0, io.SeekStart) - rtest.OK(t, err) - rtest.Equals(t, int64(0), n64) - - n, err = wr.Write([]byte{0, 1, 2, 3, 4}) - rtest.OK(t, err) - rtest.Equals(t, 5, n) - n, err = wr.Write([]byte{5, 6, 7, 8, 9}) - rtest.OK(t, err) - rtest.Equals(t, 5, n) - rtest.Equals(t, []byte{0, 1, 2, 3, 4, 5, 6, 7, 8, 9}, wr.data) - - // negative tests - _, err = wr.Write([]byte{1}) - assertNotOK(t, "write overflow", err) - _, err = wr.Seek(1, io.SeekStart) - assertNotOK(t, "unsupported seek", err) -} - -func TestPackCacheBasic(t *testing.T) { - assertReader := func(expected []byte, offset int64, rd io.ReaderAt) { - actual := make([]byte, len(expected)) - rd.ReadAt(actual, offset) - rtest.Equals(t, expected, actual) - } - - c := newPackCache(10) - - id := restic.NewRandomID() - - // load pack to the cache - rd, err := c.get(id, 10, 5, func(offset int64, length int, wr io.WriteSeeker) error { - rtest.Equals(t, int64(10), offset) - rtest.Equals(t, 5, length) - wr.Write([]byte{1, 2, 3, 4, 5}) - return nil - }) - rtest.OK(t, err) - assertReader([]byte{1, 2, 3, 4, 5}, 10, rd) - - // must close pack reader before can request it again - _, err = c.get(id, 10, 5, func(offset int64, length int, wr io.WriteSeeker) error { - t.Error("unexpected cache load call") - return nil - }) - assertNotOK(t, "double-reservation", err) - - // close the pack reader and get it from cache - rd.Close() - rd, err = c.get(id, 10, 5, func(offset int64, length int, wr io.WriteSeeker) error { - t.Error("unexpected cache load call") - return nil - }) - rtest.OK(t, err) - assertReader([]byte{1, 2, 3, 4, 5}, 10, rd) - - // close the pack reader and remove the pack from cache, assert the pack is loaded on request - rd.Close() - c.remove(id) - rd, err = c.get(id, 10, 5, func(offset int64, length int, wr io.WriteSeeker) error { - rtest.Equals(t, int64(10), offset) - rtest.Equals(t, 5, length) - wr.Write([]byte{1, 2, 3, 4, 5}) - return nil - }) - rtest.OK(t, err) - assertReader([]byte{1, 2, 3, 4, 5}, 10, rd) -} - -func TestPackCacheInvalidRange(t *testing.T) { - c := newPackCache(10) - - id := restic.NewRandomID() - - _, err := c.get(id, -1, 1, func(offset int64, length int, wr io.WriteSeeker) error { - t.Error("unexpected cache load call") - return nil - }) - assertNotOK(t, "negative offset request", err) - - _, err = c.get(id, 0, 0, func(offset int64, length int, wr io.WriteSeeker) error { - t.Error("unexpected cache load call") - return nil - }) - assertNotOK(t, "zero length request", err) - - _, err = c.get(id, 0, -1, func(offset int64, length int, wr io.WriteSeeker) error { - t.Error("unexpected cache load call") - return nil - }) - assertNotOK(t, "negative length", err) -} - -func TestPackCacheCapacity(t *testing.T) { - c := newPackCache(10) - - id1, id2, id3 := restic.NewRandomID(), restic.NewRandomID(), restic.NewRandomID() - - // load and reserve pack1 - rd1, err := c.get(id1, 0, 5, func(offset int64, length int, wr io.WriteSeeker) error { - wr.Write([]byte{1, 2, 3, 4, 5}) - return nil - }) - rtest.OK(t, err) - - // load and reserve pack2 - _, err = c.get(id2, 0, 5, func(offset int64, length int, wr io.WriteSeeker) error { - wr.Write([]byte{1, 2, 3, 4, 5}) - return nil - }) - rtest.OK(t, err) - - // can't load pack3 because not enough space in the cache - _, err = c.get(id3, 0, 5, func(offset int64, length int, wr io.WriteSeeker) error { - t.Error("unexpected cache load call") - return nil - }) - assertNotOK(t, "request over capacity", err) - - // release pack1 and try again - rd1.Close() - rd3, err := c.get(id3, 0, 5, func(offset int64, length int, wr io.WriteSeeker) error { - wr.Write([]byte{1, 2, 3, 4, 5}) - return nil - }) - rtest.OK(t, err) - - // release pack3 and load pack1 (should not come from cache) - rd3.Close() - loaded := false - rd1, err = c.get(id1, 0, 5, func(offset int64, length int, wr io.WriteSeeker) error { - wr.Write([]byte{1, 2, 3, 4, 5}) - loaded = true - return nil - }) - rtest.OK(t, err) - rtest.Equals(t, true, loaded) -} - -func TestPackCacheDownsizeRecord(t *testing.T) { - c := newPackCache(10) - - id := restic.NewRandomID() - - // get bigger range first - rd, err := c.get(id, 5, 5, func(offset int64, length int, wr io.WriteSeeker) error { - wr.Write([]byte{1, 2, 3, 4, 5}) - return nil - }) - rtest.OK(t, err) - rd.Close() - - // invalid "resize" requests - _, err = c.get(id, 5, 10, func(offset int64, length int, wr io.WriteSeeker) error { - t.Error("unexpected pack load") - return nil - }) - assertNotOK(t, "resize cached record", err) - - // invalid before cached range request - _, err = c.get(id, 0, 5, func(offset int64, length int, wr io.WriteSeeker) error { - t.Error("unexpected pack load") - return nil - }) - assertNotOK(t, "before cached range request", err) - - // invalid after cached range request - _, err = c.get(id, 10, 5, func(offset int64, length int, wr io.WriteSeeker) error { - t.Error("unexpected pack load") - return nil - }) - assertNotOK(t, "after cached range request", err) - - // now get smaller "nested" range - rd, err = c.get(id, 7, 1, func(offset int64, length int, wr io.WriteSeeker) error { - t.Error("unexpected pack load") - return nil - }) - rtest.OK(t, err) - - // assert expected data - buf := make([]byte, 1) - rd.ReadAt(buf, 7) - rtest.Equals(t, byte(3), buf[0]) - _, err = rd.ReadAt(buf, 0) - assertNotOK(t, "read before downsized pack range", err) - _, err = rd.ReadAt(buf, 9) - assertNotOK(t, "read after downsized pack range", err) - - // can't request downsized record again - _, err = c.get(id, 7, 1, func(offset int64, length int, wr io.WriteSeeker) error { - t.Error("unexpected pack load") - return nil - }) - assertNotOK(t, "double-allocation of cache record subrange", err) - - // can't request another subrange of the original record - _, err = c.get(id, 6, 1, func(offset int64, length int, wr io.WriteSeeker) error { - t.Error("unexpected pack load") - return nil - }) - assertNotOK(t, "allocation of another subrange of cache record", err) - - // release downsized record and assert the original is back in the cache - rd.Close() - rd, err = c.get(id, 5, 5, func(offset int64, length int, wr io.WriteSeeker) error { - t.Error("unexpected pack load") - return nil - }) - rtest.OK(t, err) - rd.Close() -} - -func TestPackCacheFailedDownload(t *testing.T) { - c := newPackCache(10) - assertEmpty := func() { - rtest.Equals(t, 0, len(c.cachedPacks)) - rtest.Equals(t, 10, c.capacity) - rtest.Equals(t, 0, c.reservedCapacity) - rtest.Equals(t, 0, c.allocatedCapacity) - } - - _, err := c.get(restic.NewRandomID(), 0, 5, func(offset int64, length int, wr io.WriteSeeker) error { - return errors.Errorf("expected induced test error") - }) - assertNotOK(t, "not enough bytes read", err) - assertEmpty() - - _, err = c.get(restic.NewRandomID(), 0, 5, func(offset int64, length int, wr io.WriteSeeker) error { - wr.Write([]byte{1}) - return nil - }) - assertNotOK(t, "not enough bytes read", err) - assertEmpty() - - _, err = c.get(restic.NewRandomID(), 0, 5, func(offset int64, length int, wr io.WriteSeeker) error { - wr.Write([]byte{1, 2, 3, 4, 5, 6}) - return nil - }) - assertNotOK(t, "too many bytes read", err) - assertEmpty() -} - -func TestPackCacheInvalidRequests(t *testing.T) { - c := newPackCache(10) - - id := restic.NewRandomID() - - // - rd, _ := c.get(id, 0, 1, func(offset int64, length int, wr io.WriteSeeker) error { - wr.Write([]byte{1}) - return nil - }) - assertNotOK(t, "remove() reserved pack", c.remove(id)) - rtest.OK(t, rd.Close()) - assertNotOK(t, "multiple reader Close() calls)", rd.Close()) - - // - rtest.OK(t, c.remove(id)) - assertNotOK(t, "double remove() the same pack", c.remove(id)) -} - -func TestPackCacheRecord(t *testing.T) { - rd := &packCacheRecord{ - offset: 10, - data: []byte{1}, - } - buf := make([]byte, 1) - n, err := rd.ReadAt(buf, 10) - rtest.OK(t, err) - rtest.Equals(t, 1, n) - rtest.Equals(t, byte(1), buf[0]) - - _, err = rd.ReadAt(buf, 0) - assertNotOK(t, "read before loaded range", err) - - _, err = rd.ReadAt(buf, 11) - assertNotOK(t, "read after loaded range", err) - - _, err = rd.ReadAt(make([]byte, 2), 10) - assertNotOK(t, "read more than available data", err) -} diff --git a/internal/restorer/packheap.go b/internal/restorer/packheap.go deleted file mode 100644 index 9f8443d46..000000000 --- a/internal/restorer/packheap.go +++ /dev/null @@ -1,51 +0,0 @@ -package restorer - -// packHeap is a heap of packInfo references -// @see https://golang.org/pkg/container/heap/ -// @see https://en.wikipedia.org/wiki/Heap_(data_structure) -type packHeap struct { - elements []*packInfo - - // returns true if download of any of the files is in progress - inprogress func(files map[*fileInfo]struct{}) bool -} - -func (pq *packHeap) Len() int { return len(pq.elements) } - -func (pq *packHeap) Less(a, b int) bool { - packA, packB := pq.elements[a], pq.elements[b] - - ap := pq.inprogress(packA.files) - bp := pq.inprogress(packB.files) - if ap && !bp { - return true - } - - if packA.cost < packB.cost { - return true - } - - return false -} - -func (pq *packHeap) Swap(i, j int) { - pq.elements[i], pq.elements[j] = pq.elements[j], pq.elements[i] - pq.elements[i].index = i - pq.elements[j].index = j -} - -func (pq *packHeap) Push(x interface{}) { - n := len(pq.elements) - item := x.(*packInfo) - item.index = n - pq.elements = append(pq.elements, item) -} - -func (pq *packHeap) Pop() interface{} { - old := pq.elements - n := len(old) - item := old[n-1] - item.index = -1 // for safety - pq.elements = old[0 : n-1] - return item -} diff --git a/internal/restorer/packqueue.go b/internal/restorer/packqueue.go deleted file mode 100644 index fe8259846..000000000 --- a/internal/restorer/packqueue.go +++ /dev/null @@ -1,224 +0,0 @@ -package restorer - -import ( - "container/heap" - - "github.com/restic/restic/internal/debug" - "github.com/restic/restic/internal/restic" -) - -// packQueue tracks remaining file contents restore work and decides what pack -// to download and files to write next. -// -// The packs in the queue can be in one of three states: waiting, ready and -// in-progress. -// Waiting packs are the packs that only have blobs from the "middle" of their -// corresponding files and therefore cannot be used until blobs from some other -// packs are written to the files first. -// In-progress packs are the packs that were removed from the queue by #nextPack -// and must be first returned to the queue before they are considered again. -// Ready packs are the packs can be immediately used to restore at least one -// file. Internally ready packs are kept in a heap and are ordered according -// to these criteria: -// - Packs with "head" blobs of in-progress files are considered first. The -// idea is to complete restore of in-progress files before starting restore -// of other files. This is both more intuitive and also reduces number of -// open file handles needed during restore. -// - Packs with smallest cost are considered next. Pack cost is measured in -// number of other packs required before all blobs in the pack can be used -// and the pack can be removed from the pack cache. -// For example, consisder a file that requires two blobs, blob1 from pack1 -// and blob2 from pack2. The cost of pack2 is 1, because blob2 cannot be -// used before blob1 is available. The higher the cost, the longer the pack -// must be cached locally to avoid redownload. -// -// Pack queue implementation is NOT thread safe. All pack queue methods must -// be called from single gorouting AND packInfo and fileInfo instances must -// be updated synchronously from the same gorouting. -type packQueue struct { - idx filePackTraverser - - packs map[restic.ID]*packInfo // waiting and ready packs - inprogress map[*packInfo]struct{} // inprogress packs - - heap *packHeap // heap of ready packs -} - -func newPackQueue(idx filePackTraverser, files []*fileInfo, inprogress func(files map[*fileInfo]struct{}) bool) (*packQueue, error) { - packs := make(map[restic.ID]*packInfo) // all packs - - // create packInfo from fileInfo - for _, file := range files { - err := idx.forEachFilePack(file, func(packIdx int, packID restic.ID, _ []restic.Blob) bool { - pack, ok := packs[packID] - if !ok { - pack = &packInfo{ - id: packID, - index: -1, - files: make(map[*fileInfo]struct{}), - } - packs[packID] = pack - } - pack.files[file] = struct{}{} - pack.cost += packIdx - - return true // keep going - }) - if err != nil { - // repository index is messed up, can't do anything - return nil, err - } - } - - // create packInfo heap - pheap := &packHeap{inprogress: inprogress} - headPacks := restic.NewIDSet() - for _, file := range files { - idx.forEachFilePack(file, func(packIdx int, packID restic.ID, _ []restic.Blob) bool { - if !headPacks.Has(packID) { - headPacks.Insert(packID) - pack := packs[packID] - pack.index = len(pheap.elements) - pheap.elements = append(pheap.elements, pack) - } - return false // only first pack - }) - } - heap.Init(pheap) - - return &packQueue{idx: idx, packs: packs, heap: pheap, inprogress: make(map[*packInfo]struct{})}, nil -} - -// isEmpty returns true if the queue is empty, i.e. there are no more packs to -// download and files to write to. -func (h *packQueue) isEmpty() bool { - return len(h.packs) == 0 && len(h.inprogress) == 0 -} - -// nextPack returns next ready pack and corresponding files ready for download -// and processing. The returned pack and the files are marked as "in progress" -// internally and must be first returned to the queue before they are -// considered by #nextPack again. -func (h *packQueue) nextPack() (*packInfo, []*fileInfo) { - debug.Log("Ready packs %d, outstanding packs %d, inprogress packs %d", h.heap.Len(), len(h.packs), len(h.inprogress)) - - if h.heap.Len() == 0 { - return nil, nil - } - - pack := heap.Pop(h.heap).(*packInfo) - h.inprogress[pack] = struct{}{} - debug.Log("Popped pack %s (%d files), heap size=%d", pack.id.Str(), len(pack.files), len(h.heap.elements)) - var files []*fileInfo - for file := range pack.files { - h.idx.forEachFilePack(file, func(packIdx int, packID restic.ID, packBlobs []restic.Blob) bool { - debug.Log("Pack #%d %s (%d blobs) used by %s", packIdx, packID.Str(), len(packBlobs), file.location) - if pack.id == packID { - files = append(files, file) - } - return false // only interested in the fist pack here - }) - } - - return pack, files -} - -// requeuePack conditionally adds back to the queue pack previously returned by -// #nextPack. -// If the pack is needed to restore any incomplete files, adds the pack to the -// queue and adjusts order of all affected packs in the queue. Has no effect -// if the pack is not required to restore any files. -// Returns true if the pack was added to the queue, false otherwise. -func (h *packQueue) requeuePack(pack *packInfo, success []*fileInfo, failure []*fileInfo) bool { - debug.Log("Requeue pack %s (%d/%d/%d files/success/failure)", pack.id.Str(), len(pack.files), len(success), len(failure)) - - // maintain inprogress pack set - delete(h.inprogress, pack) - - affectedPacks := make(map[*packInfo]struct{}) - affectedPacks[pack] = struct{}{} // this pack is alwats affected - - // apply download success/failure to the packs - onFailure := func(file *fileInfo) { - h.idx.forEachFilePack(file, func(packInx int, packID restic.ID, _ []restic.Blob) bool { - pack := h.packs[packID] - delete(pack.files, file) - pack.cost -= packInx - affectedPacks[pack] = struct{}{} - return true // keep going - }) - } - for _, file := range failure { - onFailure(file) - } - onSuccess := func(pack *packInfo, file *fileInfo) { - remove := true - h.idx.forEachFilePack(file, func(packIdx int, packID restic.ID, _ []restic.Blob) bool { - if packID.Equal(pack.id) { - // the pack has more blobs required by the file - remove = false - } - otherPack := h.packs[packID] - otherPack.cost-- - affectedPacks[otherPack] = struct{}{} - return true // keep going - }) - if remove { - delete(pack.files, file) - } - } - for _, file := range success { - onSuccess(pack, file) - } - - // drop/update affected packs - isReady := func(affectedPack *packInfo) (ready bool) { - for file := range affectedPack.files { - h.idx.forEachFilePack(file, func(packIdx int, packID restic.ID, _ []restic.Blob) bool { - if packID.Equal(affectedPack.id) { - ready = true - } - return false // only file's first pack matters - }) - if ready { - break - } - } - return ready - } - for affectedPack := range affectedPacks { - if _, inprogress := h.inprogress[affectedPack]; !inprogress { - if len(affectedPack.files) == 0 { - // drop the pack if it isn't inprogress and has no files that need it - if affectedPack.index >= 0 { - // This can't happen unless there is a bug elsewhere: - // - "current" pack isn't in the heap, hence its index must be < 0 - // - "other" packs can't be ready (i.e. in heap) unless they have other files - // in which case len(affectedPack.files) must be > 0 - debug.Log("corrupted ready heap: removed unexpected ready pack %s", affectedPack.id.Str()) - heap.Remove(h.heap, affectedPack.index) - } - delete(h.packs, affectedPack.id) - } else { - ready := isReady(affectedPack) - switch { - case ready && affectedPack.index < 0: - heap.Push(h.heap, affectedPack) - case ready && affectedPack.index >= 0: - heap.Fix(h.heap, affectedPack.index) - case !ready && affectedPack.index >= 0: - // This can't happen unless there is a bug elsewhere: - // - "current" pack isn't in the heap, hence its index must be < 0 - // - "other" packs can't have same head blobs as the "current" pack, - // hence "other" packs can't change their readiness - debug.Log("corrupted ready heap: removed unexpected waiting pack %s", affectedPack.id.Str()) - heap.Remove(h.heap, affectedPack.index) - case !ready && affectedPack.index < 0: - // do nothing - } - } - } - } - - return len(pack.files) > 0 -} diff --git a/internal/restorer/packqueue_test.go b/internal/restorer/packqueue_test.go deleted file mode 100644 index 880f7037a..000000000 --- a/internal/restorer/packqueue_test.go +++ /dev/null @@ -1,236 +0,0 @@ -package restorer - -import ( - "testing" - - "github.com/restic/restic/internal/restic" - rtest "github.com/restic/restic/internal/test" -) - -func processPack(t *testing.T, data *TestRepo, pack *packInfo, files []*fileInfo) { - for _, file := range files { - data.idx.forEachFilePack(file, func(packIdx int, packID restic.ID, packBlobs []restic.Blob) bool { - // assert file's head pack - rtest.Equals(t, pack.id, packID) - file.blobs = file.blobs[len(packBlobs):] - return false // only interested in the head pack - }) - } -} - -func TestPackQueueBasic(t *testing.T) { - data := newTestRepo([]TestFile{ - TestFile{ - name: "file", - blobs: []TestBlob{ - TestBlob{"data1", "pack1"}, - TestBlob{"data2", "pack2"}, - }, - }, - }) - - queue, err := newPackQueue(data.idx, data.files, func(_ map[*fileInfo]struct{}) bool { return false }) - rtest.OK(t, err) - - // assert initial queue state - rtest.Equals(t, false, queue.isEmpty()) - rtest.Equals(t, 0, queue.packs[data.packID("pack1")].cost) - rtest.Equals(t, 1, queue.packs[data.packID("pack2")].cost) - - // get first pack - pack, files := queue.nextPack() - rtest.Equals(t, "pack1", data.packName(pack)) - rtest.Equals(t, 1, len(files)) - rtest.Equals(t, false, queue.isEmpty()) - // TODO assert pack is inprogress - - // can't process the second pack until the first one is processed - { - pack, files := queue.nextPack() - rtest.Equals(t, true, pack == nil) - rtest.Equals(t, true, files == nil) - rtest.Equals(t, false, queue.isEmpty()) - } - - // requeue the pack without processing - rtest.Equals(t, true, queue.requeuePack(pack, []*fileInfo{}, []*fileInfo{})) - rtest.Equals(t, false, queue.isEmpty()) - rtest.Equals(t, 0, queue.packs[data.packID("pack1")].cost) - rtest.Equals(t, 1, queue.packs[data.packID("pack2")].cost) - - // get the first pack again - pack, files = queue.nextPack() - rtest.Equals(t, "pack1", data.packName(pack)) - rtest.Equals(t, 1, len(files)) - rtest.Equals(t, false, queue.isEmpty()) - - // process the first pack and return it to the queue - processPack(t, data, pack, files) - rtest.Equals(t, false, queue.requeuePack(pack, files, []*fileInfo{})) - rtest.Equals(t, 0, queue.packs[data.packID("pack2")].cost) - - // get the second pack - pack, files = queue.nextPack() - rtest.Equals(t, "pack2", data.packName(pack)) - rtest.Equals(t, 1, len(files)) - rtest.Equals(t, false, queue.isEmpty()) - - // process the second pack and return it to the queue - processPack(t, data, pack, files) - rtest.Equals(t, false, queue.requeuePack(pack, files, []*fileInfo{})) - - // all packs processed - rtest.Equals(t, true, queue.isEmpty()) -} - -func TestPackQueueFailedFile(t *testing.T) { - // point of this test is to assert that enqueuePack removes - // all references to files that failed restore - - data := newTestRepo([]TestFile{ - TestFile{ - name: "file", - blobs: []TestBlob{ - TestBlob{"data1", "pack1"}, - TestBlob{"data2", "pack2"}, - }, - }, - }) - - queue, err := newPackQueue(data.idx, data.files, func(_ map[*fileInfo]struct{}) bool { return false }) - rtest.OK(t, err) - - pack, files := queue.nextPack() - rtest.Equals(t, false, queue.requeuePack(pack, []*fileInfo{}, files /*failed*/)) - rtest.Equals(t, true, queue.isEmpty()) -} - -func TestPackQueueOrderingCost(t *testing.T) { - // assert pack1 is selected before pack2: - // pack1 is ready to restore file1, pack2 is ready to restore file2 - // but pack2 cannot be immediately used to restore file1 - - data := newTestRepo([]TestFile{ - TestFile{ - name: "file1", - blobs: []TestBlob{ - TestBlob{"data1", "pack1"}, - TestBlob{"data2", "pack2"}, - }, - }, - TestFile{ - name: "file2", - blobs: []TestBlob{ - TestBlob{"data2", "pack2"}, - }, - }, - }) - - queue, err := newPackQueue(data.idx, data.files, func(_ map[*fileInfo]struct{}) bool { return false }) - rtest.OK(t, err) - - // assert initial pack costs - rtest.Equals(t, 0, data.pack(queue, "pack1").cost) - rtest.Equals(t, 0, data.pack(queue, "pack1").index) // head of the heap - rtest.Equals(t, 1, data.pack(queue, "pack2").cost) - rtest.Equals(t, 1, data.pack(queue, "pack2").index) - - pack, files := queue.nextPack() - // assert selected pack and queue state - rtest.Equals(t, "pack1", data.packName(pack)) - // process the pack - processPack(t, data, pack, files) - rtest.Equals(t, false, queue.requeuePack(pack, files, []*fileInfo{})) -} - -func TestPackQueueOrderingInprogress(t *testing.T) { - // finish restoring one file before starting another - - data := newTestRepo([]TestFile{ - TestFile{ - name: "file1", - blobs: []TestBlob{ - TestBlob{"data1-1", "pack1-1"}, - TestBlob{"data1-2", "pack1-2"}, - }, - }, - TestFile{ - name: "file2", - blobs: []TestBlob{ - TestBlob{"data2-1", "pack2-1"}, - TestBlob{"data2-2", "pack2-2"}, - }, - }, - }) - - var inprogress *fileInfo - queue, err := newPackQueue(data.idx, data.files, func(files map[*fileInfo]struct{}) bool { - _, found := files[inprogress] - return found - }) - rtest.OK(t, err) - - // first pack of a file - pack, files := queue.nextPack() - rtest.Equals(t, 1, len(files)) - file := files[0] - processPack(t, data, pack, files) - inprogress = files[0] - queue.requeuePack(pack, files, []*fileInfo{}) - - // second pack of the same file - pack, files = queue.nextPack() - rtest.Equals(t, 1, len(files)) - rtest.Equals(t, true, file == files[0]) // same file as before - processPack(t, data, pack, files) - inprogress = nil - queue.requeuePack(pack, files, []*fileInfo{}) - - // first pack of the second file - pack, files = queue.nextPack() - rtest.Equals(t, 1, len(files)) - rtest.Equals(t, false, file == files[0]) // different file as before -} - -func TestPackQueuePackMultiuse(t *testing.T) { - // the same pack is required multiple times to restore the same file - - data := newTestRepo([]TestFile{ - TestFile{ - name: "file", - blobs: []TestBlob{ - TestBlob{"data1", "pack1"}, - TestBlob{"data2", "pack2"}, - TestBlob{"data3", "pack1"}, // pack1 reuse, new blob - TestBlob{"data2", "pack2"}, // pack2 reuse, same blob - }, - }, - }) - - queue, err := newPackQueue(data.idx, data.files, func(_ map[*fileInfo]struct{}) bool { return false }) - rtest.OK(t, err) - - pack, files := queue.nextPack() - rtest.Equals(t, "pack1", data.packName(pack)) - rtest.Equals(t, 1, len(pack.files)) - processPack(t, data, pack, files) - rtest.Equals(t, true, queue.requeuePack(pack, files, []*fileInfo{})) - - pack, files = queue.nextPack() - rtest.Equals(t, "pack2", data.packName(pack)) - rtest.Equals(t, 1, len(pack.files)) - processPack(t, data, pack, files) - rtest.Equals(t, true, queue.requeuePack(pack, files, []*fileInfo{})) - - pack, files = queue.nextPack() - rtest.Equals(t, "pack1", data.packName(pack)) - processPack(t, data, pack, files) - rtest.Equals(t, false, queue.requeuePack(pack, files, []*fileInfo{})) - - pack, files = queue.nextPack() - rtest.Equals(t, "pack2", data.packName(pack)) - processPack(t, data, pack, files) - rtest.Equals(t, false, queue.requeuePack(pack, files, []*fileInfo{})) - - rtest.Equals(t, true, queue.isEmpty()) -} diff --git a/internal/restorer/restorer.go b/internal/restorer/restorer.go index bbef5083b..7f497c5d6 100644 --- a/internal/restorer/restorer.go +++ b/internal/restorer/restorer.go @@ -206,7 +206,7 @@ func (res *Restorer) RestoreTo(ctx context.Context, dst string) error { idx := restic.NewHardlinkIndex() - filerestorer := newFileRestorer(dst, res.repo.Backend().Load, res.repo.Key(), filePackTraverser{lookup: res.repo.Index().Lookup}) + filerestorer := newFileRestorer(dst, res.repo.Backend().Load, res.repo.Key(), res.repo.Index().Lookup) // first tree pass: create directories and collect all files to restore err = res.traverseTree(ctx, dst, string(filepath.Separator), *res.sn.Tree, treeVisitor{ @@ -249,7 +249,7 @@ func (res *Restorer) RestoreTo(ctx context.Context, dst string) error { return err } - err = filerestorer.restoreFiles(ctx, func(location string, err error) { res.Error(location, err) }) + err = filerestorer.restoreFiles(ctx) if err != nil { return err }