diff --git a/internal/repository/repository.go b/internal/repository/repository.go index 31bd76685..39953cd88 100644 --- a/internal/repository/repository.go +++ b/internal/repository/repository.go @@ -11,7 +11,7 @@ import ( "sort" "sync" - "github.com/cenkalti/backoff" + "github.com/cenkalti/backoff/v4" "github.com/restic/chunker" "github.com/restic/restic/internal/backend/dryrun" "github.com/restic/restic/internal/cache" @@ -746,6 +746,10 @@ type Loader interface { type BackendLoadFn func(ctx context.Context, h restic.Handle, length int, offset int64, fn func(rd io.Reader) error) error +// StreamPack loads the listed blobs from the specified pack file. The plaintext blob is passed to +// the handleBlobFn callback or an error if decryption failed or the blob hash does not match. In +// case of download errors handleBlobFn might be called multiple times for the same blob. If the +// callback returns an error, then StreamPack will abort and not retry it. func StreamPack(ctx context.Context, beLoad BackendLoadFn, key *crypto.Key, packID restic.ID, blobs []restic.Blob, handleBlobFn func(blob restic.BlobHandle, buf []byte, err error) error) error { if len(blobs) == 0 { // nothing to do @@ -762,8 +766,13 @@ func StreamPack(ctx context.Context, beLoad BackendLoadFn, key *crypto.Key, pack debug.Log("streaming pack %v (%d to %d bytes), blobs: %v", packID, dataStart, dataEnd, len(blobs)) + ctx, cancel := context.WithCancel(ctx) // stream blobs in pack err := beLoad(ctx, h, int(dataEnd-dataStart), int64(dataStart), func(rd io.Reader) error { + // prevent callbacks after cancelation + if ctx.Err() != nil { + return ctx.Err() + } bufferSize := int(dataEnd - dataStart) if bufferSize > MaxStreamBufferSize { bufferSize = MaxStreamBufferSize @@ -823,6 +832,7 @@ func StreamPack(ctx context.Context, beLoad BackendLoadFn, key *crypto.Key, pack err = handleBlobFn(entry.BlobHandle, plaintext, err) if err != nil { + cancel() return backoff.Permanent(err) } }