diff --git a/changelog/unreleased/issue-1822 b/changelog/unreleased/issue-1822 new file mode 100644 index 000000000..4093c4353 --- /dev/null +++ b/changelog/unreleased/issue-1822 @@ -0,0 +1,9 @@ +Bugfix: Allow uploading large files to MS Azure + +Sometimes, restic creates files to be uploaded to the repository which are +quite large, e.g. when saving directories with many entries or very large +files. The MS Azure API does not allow uploading files larger that 256MiB +directly, rather restic needs to upload them in blocks of 100MiB. This is now +implemented. + +https://github.com/restic/restic/issues/1822 diff --git a/internal/backend/azure/azure.go b/internal/backend/azure/azure.go index 0b3f0b6ea..7a4617a44 100644 --- a/internal/backend/azure/azure.go +++ b/internal/backend/azure/azure.go @@ -2,6 +2,7 @@ package azure import ( "context" + "encoding/base64" "io" "io/ioutil" "net/http" @@ -64,13 +65,13 @@ func open(cfg Config, rt http.RoundTripper) (*Backend, error) { } // Open opens the Azure backend at specified container. -func Open(cfg Config, rt http.RoundTripper) (restic.Backend, error) { +func Open(cfg Config, rt http.RoundTripper) (*Backend, error) { return open(cfg, rt) } // Create opens the Azure backend at specified container and creates the container if // it does not exist yet. -func Create(cfg Config, rt http.RoundTripper) (restic.Backend, error) { +func Create(cfg Config, rt http.RoundTripper) (*Backend, error) { be, err := open(cfg, rt) if err != nil { @@ -129,8 +130,18 @@ func (be *Backend) Save(ctx context.Context, h restic.Handle, rd restic.RewindRe debug.Log("InsertObject(%v, %v)", be.container.Name, objName) - // wrap the reader so that net/http client cannot close the reader - err := be.container.GetBlobReference(objName).CreateBlockBlobFromReader(ioutil.NopCloser(rd), nil) + var err error + if rd.Length() < 256*1024*1024 { + // wrap the reader so that net/http client cannot close the reader + dataReader := ioutil.NopCloser(rd) + + // if it's smaller than 256miB, then just create the file directly from the reader + err = be.container.GetBlobReference(objName).CreateBlockBlobFromReader(dataReader, nil) + } else { + // otherwise use the more complicated method + err = be.saveLarge(ctx, objName, rd) + + } be.sem.ReleaseToken() debug.Log("%v, err %#v", objName, err) @@ -138,6 +149,55 @@ func (be *Backend) Save(ctx context.Context, h restic.Handle, rd restic.RewindRe return errors.Wrap(err, "CreateBlockBlobFromReader") } +func (be *Backend) saveLarge(ctx context.Context, objName string, rd restic.RewindReader) error { + // create the file on the server + file := be.container.GetBlobReference(objName) + err := file.CreateBlockBlob(nil) + if err != nil { + return errors.Wrap(err, "CreateBlockBlob") + } + + // read the data, in 100 MiB chunks + buf := make([]byte, 100*1024*1024) + var blocks []storage.Block + + for { + n, err := io.ReadFull(rd, buf) + if err == io.ErrUnexpectedEOF { + err = nil + } + if err == io.EOF { + // end of file reached, no bytes have been read at all + break + } + + if err != nil { + return errors.Wrap(err, "ReadFull") + } + + buf = buf[:n] + + // upload it as a new "block", use the base64 hash for the ID + h := restic.Hash(buf) + id := base64.StdEncoding.EncodeToString(h[:]) + debug.Log("PutBlock %v with %d bytes", id, len(buf)) + err = file.PutBlock(id, buf, nil) + if err != nil { + return errors.Wrap(err, "PutBlock") + } + + blocks = append(blocks, storage.Block{ + ID: id, + Status: "Uncommitted", + }) + } + + debug.Log("uploaded %d parts: %v", len(blocks), blocks) + err = file.PutBlockList(blocks, nil) + debug.Log("PutBlockList returned %v", err) + return errors.Wrap(err, "PutBlockList") +} + // wrapReader wraps an io.ReadCloser to run an additional function on Close. type wrapReader struct { io.ReadCloser diff --git a/internal/backend/azure/azure_test.go b/internal/backend/azure/azure_test.go index d738de857..f5ef72395 100644 --- a/internal/backend/azure/azure_test.go +++ b/internal/backend/azure/azure_test.go @@ -1,8 +1,10 @@ package azure_test import ( + "bytes" "context" "fmt" + "io" "os" "testing" "time" @@ -122,3 +124,95 @@ func BenchmarkBackendAzure(t *testing.B) { t.Logf("run tests") newAzureTestSuite(t).RunBenchmarks(t) } + +func TestUploadLargeFile(t *testing.T) { + if os.Getenv("RESTIC_AZURE_TEST_LARGE_UPLOAD") == "" { + t.Skip("set RESTIC_AZURE_TEST_LARGE_UPLOAD=1 to test large uploads") + return + } + + ctx, cancel := context.WithCancel(context.TODO()) + defer cancel() + + if os.Getenv("RESTIC_TEST_AZURE_REPOSITORY") == "" { + t.Skipf("environment variables not available") + return + } + + azcfg, err := azure.ParseConfig(os.Getenv("RESTIC_TEST_AZURE_REPOSITORY")) + if err != nil { + if err != nil { + t.Fatal(err) + } + } + + cfg := azcfg.(azure.Config) + cfg.AccountName = os.Getenv("RESTIC_TEST_AZURE_ACCOUNT_NAME") + cfg.AccountKey = os.Getenv("RESTIC_TEST_AZURE_ACCOUNT_KEY") + cfg.Prefix = fmt.Sprintf("test-upload-large-%d", time.Now().UnixNano()) + + tr, err := backend.Transport(backend.TransportOptions{}) + if err != nil { + t.Fatal(err) + } + + be, err := azure.Create(cfg, tr) + if err != nil { + if err != nil { + t.Fatal(err) + } + } + + defer func() { + err := be.Delete(ctx) + if err != nil { + t.Fatal(err) + } + }() + + data := rtest.Random(23, 300*1024*1024) + id := restic.Hash(data) + h := restic.Handle{Name: id.String(), Type: restic.DataFile} + + t.Logf("hash of %d bytes: %v", len(data), id) + + err = be.Save(ctx, h, restic.NewByteReader(data)) + if err != nil { + t.Fatal(err) + } + defer func() { + err := be.Remove(ctx, h) + if err != nil { + t.Fatal(err) + } + }() + + var tests = []struct { + offset, length int + }{ + {0, len(data)}, + {23, 1024}, + {23 + 100*1024, 500}, + {888 + 200*1024, 89999}, + {888 + 100*1024*1024, 120 * 1024 * 1024}, + } + + for _, test := range tests { + t.Run("", func(t *testing.T) { + want := data[test.offset : test.offset+test.length] + + buf := make([]byte, test.length) + err = be.Load(ctx, h, test.length, int64(test.offset), func(rd io.Reader) error { + _, err = io.ReadFull(rd, buf) + return err + }) + if err != nil { + t.Fatal(err) + } + + if !bytes.Equal(buf, want) { + t.Fatalf("wrong bytes returned") + } + }) + } +}