mirror of
https://github.com/octoleo/restic.git
synced 2025-01-22 14:48:24 +00:00
Merge pull request #3176 from MichaelEischer/backend-content-length
Pass upload size to backends and sanity check it
This commit is contained in:
commit
bbdf18c4a2
@ -4,7 +4,6 @@ import (
|
||||
"context"
|
||||
"encoding/base64"
|
||||
"io"
|
||||
"io/ioutil"
|
||||
"net/http"
|
||||
"os"
|
||||
"path"
|
||||
@ -118,6 +117,16 @@ func (be *Backend) Path() string {
|
||||
return be.prefix
|
||||
}
|
||||
|
||||
type azureAdapter struct {
|
||||
restic.RewindReader
|
||||
}
|
||||
|
||||
func (azureAdapter) Close() error { return nil }
|
||||
|
||||
func (a *azureAdapter) Len() int {
|
||||
return int(a.Length())
|
||||
}
|
||||
|
||||
// Save stores data in the backend at the handle.
|
||||
func (be *Backend) Save(ctx context.Context, h restic.Handle, rd restic.RewindReader) error {
|
||||
if err := h.Valid(); err != nil {
|
||||
@ -135,7 +144,8 @@ func (be *Backend) Save(ctx context.Context, h restic.Handle, rd restic.RewindRe
|
||||
var err error
|
||||
if rd.Length() < 256*1024*1024 {
|
||||
// wrap the reader so that net/http client cannot close the reader
|
||||
dataReader := ioutil.NopCloser(rd)
|
||||
// CreateBlockBlobFromReader reads length from `Len()``
|
||||
dataReader := azureAdapter{rd}
|
||||
|
||||
// if it's smaller than 256miB, then just create the file directly from the reader
|
||||
err = be.container.GetBlobReference(objName).CreateBlockBlobFromReader(dataReader, nil)
|
||||
@ -162,6 +172,7 @@ func (be *Backend) saveLarge(ctx context.Context, objName string, rd restic.Rewi
|
||||
// read the data, in 100 MiB chunks
|
||||
buf := make([]byte, 100*1024*1024)
|
||||
var blocks []storage.Block
|
||||
uploadedBytes := 0
|
||||
|
||||
for {
|
||||
n, err := io.ReadFull(rd, buf)
|
||||
@ -178,6 +189,7 @@ func (be *Backend) saveLarge(ctx context.Context, objName string, rd restic.Rewi
|
||||
}
|
||||
|
||||
buf = buf[:n]
|
||||
uploadedBytes += n
|
||||
|
||||
// upload it as a new "block", use the base64 hash for the ID
|
||||
h := restic.Hash(buf)
|
||||
@ -194,6 +206,11 @@ func (be *Backend) saveLarge(ctx context.Context, objName string, rd restic.Rewi
|
||||
})
|
||||
}
|
||||
|
||||
// sanity check
|
||||
if uploadedBytes != int(rd.Length()) {
|
||||
return errors.Errorf("wrote %d bytes instead of the expected %d bytes", uploadedBytes, rd.Length())
|
||||
}
|
||||
|
||||
debug.Log("uploaded %d parts: %v", len(blocks), blocks)
|
||||
err = file.PutBlockList(blocks, nil)
|
||||
debug.Log("PutBlockList returned %v", err)
|
||||
|
@ -209,6 +209,10 @@ func (be *b2Backend) Save(ctx context.Context, h restic.Handle, rd restic.Rewind
|
||||
return errors.Wrap(err, "Copy")
|
||||
}
|
||||
|
||||
// sanity check
|
||||
if n != rd.Length() {
|
||||
return errors.Errorf("wrote %d bytes instead of the expected %d bytes", n, rd.Length())
|
||||
}
|
||||
return errors.Wrap(w.Close(), "Close")
|
||||
}
|
||||
|
||||
|
@ -245,6 +245,10 @@ func (be *Backend) Save(ctx context.Context, h restic.Handle, rd restic.RewindRe
|
||||
}
|
||||
|
||||
debug.Log("%v -> %v bytes", objName, wbytes)
|
||||
// sanity check
|
||||
if wbytes != rd.Length() {
|
||||
return errors.Errorf("wrote %d bytes instead of the expected %d bytes", wbytes, rd.Length())
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
|
@ -118,11 +118,16 @@ func (b *Local) Save(ctx context.Context, h restic.Handle, rd restic.RewindReade
|
||||
}
|
||||
|
||||
// save data, then sync
|
||||
_, err = io.Copy(f, rd)
|
||||
wbytes, err := io.Copy(f, rd)
|
||||
if err != nil {
|
||||
_ = f.Close()
|
||||
return errors.Wrap(err, "Write")
|
||||
}
|
||||
// sanity check
|
||||
if wbytes != rd.Length() {
|
||||
_ = f.Close()
|
||||
return errors.Errorf("wrote %d bytes instead of the expected %d bytes", wbytes, rd.Length())
|
||||
}
|
||||
|
||||
if err = f.Sync(); err != nil {
|
||||
pathErr, ok := err.(*os.PathError)
|
||||
|
@ -84,6 +84,11 @@ func (be *MemoryBackend) Save(ctx context.Context, h restic.Handle, rd restic.Re
|
||||
be.data[h] = buf
|
||||
debug.Log("saved %v bytes at %v", len(buf), h)
|
||||
|
||||
// sanity check
|
||||
if int64(len(buf)) != rd.Length() {
|
||||
return errors.Errorf("wrote %d bytes instead of the expected %d bytes", len(buf), rd.Length())
|
||||
}
|
||||
|
||||
return ctx.Err()
|
||||
}
|
||||
|
||||
|
@ -272,9 +272,14 @@ func (be *Backend) Save(ctx context.Context, h restic.Handle, rd restic.RewindRe
|
||||
opts.ContentType = "application/octet-stream"
|
||||
|
||||
debug.Log("PutObject(%v, %v, %v)", be.cfg.Bucket, objName, rd.Length())
|
||||
n, err := be.client.PutObject(ctx, be.cfg.Bucket, objName, ioutil.NopCloser(rd), int64(rd.Length()), opts)
|
||||
info, err := be.client.PutObject(ctx, be.cfg.Bucket, objName, ioutil.NopCloser(rd), int64(rd.Length()), opts)
|
||||
|
||||
debug.Log("%v -> %v bytes, err %#v: %v", objName, n, err, err)
|
||||
debug.Log("%v -> %v bytes, err %#v: %v", objName, info.Size, err, err)
|
||||
|
||||
// sanity check
|
||||
if err != nil && info.Size != rd.Length() {
|
||||
return errors.Errorf("wrote %d bytes instead of the expected %d bytes", info.Size, rd.Length())
|
||||
}
|
||||
|
||||
return errors.Wrap(err, "client.PutObject")
|
||||
}
|
||||
|
@ -288,12 +288,18 @@ func (r *SFTP) Save(ctx context.Context, h restic.Handle, rd restic.RewindReader
|
||||
}
|
||||
|
||||
// save data, make sure to use the optimized sftp upload method
|
||||
_, err = f.ReadFrom(rd)
|
||||
wbytes, err := f.ReadFrom(rd)
|
||||
if err != nil {
|
||||
_ = f.Close()
|
||||
return errors.Wrap(err, "Write")
|
||||
}
|
||||
|
||||
// sanity check
|
||||
if wbytes != rd.Length() {
|
||||
_ = f.Close()
|
||||
return errors.Errorf("wrote %d bytes instead of the expected %d bytes", wbytes, rd.Length())
|
||||
}
|
||||
|
||||
err = f.Close()
|
||||
if err != nil {
|
||||
return errors.Wrap(err, "Close")
|
||||
|
@ -6,6 +6,7 @@ import (
|
||||
"io"
|
||||
"net/http"
|
||||
"path"
|
||||
"strconv"
|
||||
"strings"
|
||||
"time"
|
||||
|
||||
@ -176,7 +177,9 @@ func (be *beSwift) Save(ctx context.Context, h restic.Handle, rd restic.RewindRe
|
||||
encoding := "binary/octet-stream"
|
||||
|
||||
debug.Log("PutObject(%v, %v, %v)", be.container, objName, encoding)
|
||||
_, err := be.conn.ObjectPut(be.container, objName, rd, true, "", encoding, nil)
|
||||
hdr := swift.Headers{"Content-Length": strconv.FormatInt(rd.Length(), 10)}
|
||||
_, err := be.conn.ObjectPut(be.container, objName, rd, true, "", encoding, hdr)
|
||||
// swift does not return the upload length
|
||||
debug.Log("%v, err %#v", objName, err)
|
||||
|
||||
return errors.Wrap(err, "client.PutObject")
|
||||
|
@ -557,6 +557,38 @@ func (s *Suite) TestSave(t *testing.T) {
|
||||
}
|
||||
}
|
||||
|
||||
type incompleteByteReader struct {
|
||||
restic.ByteReader
|
||||
}
|
||||
|
||||
func (r *incompleteByteReader) Length() int64 {
|
||||
return r.ByteReader.Length() + 42
|
||||
}
|
||||
|
||||
// TestSaveError tests saving data in the backend.
|
||||
func (s *Suite) TestSaveError(t *testing.T) {
|
||||
seedRand(t)
|
||||
|
||||
b := s.open(t)
|
||||
defer func() {
|
||||
// rclone will report an error when closing the backend. We have to ignore it
|
||||
// otherwise this test will always fail
|
||||
_ = b.Close()
|
||||
}()
|
||||
|
||||
length := rand.Intn(1<<23) + 200000
|
||||
data := test.Random(23, length)
|
||||
var id restic.ID
|
||||
copy(id[:], data)
|
||||
|
||||
// test that incomplete uploads fail
|
||||
h := restic.Handle{Type: restic.PackFile, Name: id.String()}
|
||||
err := b.Save(context.TODO(), h, &incompleteByteReader{ByteReader: *restic.NewByteReader(data)})
|
||||
if err == nil {
|
||||
t.Fatal("incomplete upload did not fail")
|
||||
}
|
||||
}
|
||||
|
||||
var filenameTests = []struct {
|
||||
name string
|
||||
data string
|
||||
|
Loading…
x
Reference in New Issue
Block a user