From c4a2bfcb3925f9789745d6783ab5f95758638858 Mon Sep 17 00:00:00 2001 From: Michael Eischer Date: Fri, 20 Aug 2021 23:21:05 +0200 Subject: [PATCH] repository: Add StreamPacks function The function supports efficiently loading a specified list of blobs from a single pack in a streaming fashion. That is there's no need for temporary files independent of the pack size. --- internal/repository/repository.go | 85 +++++++++++++++++++++++++++++++ 1 file changed, 85 insertions(+) diff --git a/internal/repository/repository.go b/internal/repository/repository.go index 1ed65083c..df62f3ad7 100644 --- a/internal/repository/repository.go +++ b/internal/repository/repository.go @@ -1,12 +1,14 @@ package repository import ( + "bufio" "bytes" "context" "encoding/json" "fmt" "io" "os" + "sort" "sync" "github.com/restic/chunker" @@ -25,6 +27,8 @@ import ( "golang.org/x/sync/errgroup" ) +const maxStreamBufferSize = 4 * 1024 * 1024 + // Repository is used to access a repository in a backend. type Repository struct { be restic.Backend @@ -782,3 +786,84 @@ func DownloadAndHash(ctx context.Context, be Loader, h restic.Handle) (tmpfile * return tmpfile, hash, size, err } + +type BackendLoadFn func(ctx context.Context, h restic.Handle, length int, offset int64, fn func(rd io.Reader) error) error + +func StreamPack(ctx context.Context, beLoad BackendLoadFn, key *crypto.Key, packID restic.ID, blobs []restic.Blob, handleBlobFn func(blob restic.BlobHandle, buf []byte, err error) error) error { + if len(blobs) == 0 { + // nothing to do + return nil + } + + sort.Slice(blobs, func(i, j int) bool { + return blobs[i].Offset < blobs[j].Offset + }) + h := restic.Handle{Type: restic.PackFile, Name: packID.String()} + + dataStart := blobs[0].Offset + dataEnd := blobs[len(blobs)-1].Offset + blobs[len(blobs)-1].Length + + debug.Log("streaming pack %v (%d to %d bytes), blobs: %v", packID, dataStart, dataEnd, len(blobs)) + + // stream blobs in pack + err := beLoad(ctx, h, int(dataEnd-dataStart), int64(dataStart), func(rd io.Reader) error { + bufferSize := int(dataEnd - dataStart) + if bufferSize > maxStreamBufferSize { + bufferSize = maxStreamBufferSize + } + bufRd := bufio.NewReaderSize(rd, bufferSize) + currentBlobEnd := dataStart + var buf []byte + for _, entry := range blobs { + skipBytes := int(entry.Offset - currentBlobEnd) + if skipBytes < 0 { + return errors.Errorf("overlapping blobs in pack %v", packID) + } + + _, err := bufRd.Discard(skipBytes) + if err != nil { + return err + } + + h := restic.BlobHandle{ID: entry.ID, Type: entry.Type} + debug.Log(" process blob %v, skipped %d, %v", h, skipBytes, entry) + + if uint(cap(buf)) < entry.Length { + buf = make([]byte, entry.Length) + } + buf = buf[:entry.Length] + + n, err := io.ReadFull(bufRd, buf) + if err != nil { + debug.Log(" read error %v", err) + return errors.Wrap(err, "ReadFull") + } + + if n != len(buf) { + return errors.Errorf("read blob %v from %v: not enough bytes read, want %v, got %v", + h, packID.Str(), len(buf), n) + } + currentBlobEnd = entry.Offset + entry.Length + + // decryption errors are likely permanent, give the caller a chance to skip them + nonce, ciphertext := buf[:key.NonceSize()], buf[key.NonceSize():] + plaintext, err := key.Open(ciphertext[:0], nonce, ciphertext, nil) + if err == nil { + id := restic.Hash(plaintext) + if !id.Equal(entry.ID) { + debug.Log("read blob %v/%v from %v: wrong data returned, hash is %v", + h.Type, h.ID, packID.Str(), id) + err = errors.Errorf("read blob %v from %v: wrong data returned, hash is %v", + h, packID.Str(), id) + } + } + + err = handleBlobFn(entry.BlobHandle, plaintext, err) + if err != nil { + return err + } + } + return nil + }) + return errors.Wrap(err, "StreamPack") +}