mirror of
https://github.com/octoleo/restic.git
synced 2024-11-16 10:05:25 +00:00
8e1e3844aa
The SemaphoreBackend now uniformly enforces the limit of concurrent backend operations. In addition, it unifies the parameter validation. The List() methods no longer uses a semaphore. Restic already never runs multiple list operations in parallel. By managing the semaphore in a wrapper backend, the sections that hold a semaphore token grow slightly. However, the main bottleneck is IO, so this shouldn't make much of a difference. The key insight that enables the SemaphoreBackend is that all of the complex semaphore handling in `openReader()` still happens within the original call to `Load()`. Thus, getting and releasing the semaphore tokens can be refactored to happen directly in `Load()`. This eliminates the need for wrapping the reader in `openReader()` to release the token.
415 lines
11 KiB
Go
415 lines
11 KiB
Go
package azure
|
|
|
|
import (
|
|
"bytes"
|
|
"context"
|
|
"crypto/md5"
|
|
"encoding/base64"
|
|
"fmt"
|
|
"hash"
|
|
"io"
|
|
"net/http"
|
|
"path"
|
|
"strings"
|
|
|
|
"github.com/restic/restic/internal/backend"
|
|
"github.com/restic/restic/internal/backend/layout"
|
|
"github.com/restic/restic/internal/debug"
|
|
"github.com/restic/restic/internal/errors"
|
|
"github.com/restic/restic/internal/restic"
|
|
|
|
"github.com/Azure/azure-sdk-for-go/sdk/azcore"
|
|
"github.com/Azure/azure-sdk-for-go/sdk/azcore/streaming"
|
|
"github.com/Azure/azure-sdk-for-go/sdk/storage/azblob"
|
|
"github.com/Azure/azure-sdk-for-go/sdk/storage/azblob/blob"
|
|
"github.com/Azure/azure-sdk-for-go/sdk/storage/azblob/bloberror"
|
|
"github.com/Azure/azure-sdk-for-go/sdk/storage/azblob/blockblob"
|
|
azContainer "github.com/Azure/azure-sdk-for-go/sdk/storage/azblob/container"
|
|
)
|
|
|
|
// Backend stores data on an azure endpoint.
|
|
type Backend struct {
|
|
cfg Config
|
|
container *azContainer.Client
|
|
connections uint
|
|
prefix string
|
|
listMaxItems int
|
|
layout.Layout
|
|
}
|
|
|
|
const saveLargeSize = 256 * 1024 * 1024
|
|
const defaultListMaxItems = 5000
|
|
|
|
// make sure that *Backend implements backend.Backend
|
|
var _ restic.Backend = &Backend{}
|
|
|
|
func open(cfg Config, rt http.RoundTripper) (*Backend, error) {
|
|
debug.Log("open, config %#v", cfg)
|
|
var client *azContainer.Client
|
|
var err error
|
|
|
|
url := fmt.Sprintf("https://%s.blob.core.windows.net/%s", cfg.AccountName, cfg.Container)
|
|
opts := &azContainer.ClientOptions{
|
|
ClientOptions: azcore.ClientOptions{
|
|
Transport: http.DefaultClient,
|
|
},
|
|
}
|
|
|
|
if cfg.AccountKey.String() != "" {
|
|
// We have an account key value, find the BlobServiceClient
|
|
// from with a BasicClient
|
|
debug.Log(" - using account key")
|
|
cred, err := azblob.NewSharedKeyCredential(cfg.AccountName, cfg.AccountKey.Unwrap())
|
|
if err != nil {
|
|
return nil, errors.Wrap(err, "NewSharedKeyCredential")
|
|
}
|
|
|
|
client, err = azContainer.NewClientWithSharedKeyCredential(url, cred, opts)
|
|
|
|
if err != nil {
|
|
return nil, errors.Wrap(err, "NewClientWithSharedKeyCredential")
|
|
}
|
|
} else if cfg.AccountSAS.String() != "" {
|
|
// Get the client using the SAS Token as authentication, this
|
|
// is longer winded than above because the SDK wants a URL for the Account
|
|
// if your using a SAS token, and not just the account name
|
|
// we (as per the SDK ) assume the default Azure portal.
|
|
// https://github.com/Azure/azure-storage-blob-go/issues/130
|
|
debug.Log(" - using sas token")
|
|
sas := cfg.AccountSAS.Unwrap()
|
|
|
|
// strip query sign prefix
|
|
if sas[0] == '?' {
|
|
sas = sas[1:]
|
|
}
|
|
|
|
urlWithSAS := fmt.Sprintf("%s?%s", url, sas)
|
|
|
|
client, err = azContainer.NewClientWithNoCredential(urlWithSAS, opts)
|
|
if err != nil {
|
|
return nil, errors.Wrap(err, "NewAccountSASClientFromEndpointToken")
|
|
}
|
|
} else {
|
|
return nil, errors.New("no azure authentication information found")
|
|
}
|
|
|
|
be := &Backend{
|
|
container: client,
|
|
cfg: cfg,
|
|
connections: cfg.Connections,
|
|
Layout: &layout.DefaultLayout{
|
|
Path: cfg.Prefix,
|
|
Join: path.Join,
|
|
},
|
|
listMaxItems: defaultListMaxItems,
|
|
}
|
|
|
|
return be, nil
|
|
}
|
|
|
|
// Open opens the Azure backend at specified container.
|
|
func Open(ctx context.Context, cfg Config, rt http.RoundTripper) (*Backend, error) {
|
|
return open(cfg, rt)
|
|
}
|
|
|
|
// Create opens the Azure backend at specified container and creates the container if
|
|
// it does not exist yet.
|
|
func Create(ctx context.Context, cfg Config, rt http.RoundTripper) (*Backend, error) {
|
|
be, err := open(cfg, rt)
|
|
|
|
if err != nil {
|
|
return nil, errors.Wrap(err, "open")
|
|
}
|
|
|
|
_, err = be.container.GetProperties(ctx, &azContainer.GetPropertiesOptions{})
|
|
|
|
if err != nil && bloberror.HasCode(err, bloberror.ContainerNotFound) {
|
|
_, err = be.container.Create(ctx, &azContainer.CreateOptions{})
|
|
|
|
if err != nil {
|
|
return nil, errors.Wrap(err, "container.Create")
|
|
}
|
|
} else if err != nil {
|
|
return be, err
|
|
}
|
|
|
|
return be, nil
|
|
}
|
|
|
|
// SetListMaxItems sets the number of list items to load per request.
|
|
func (be *Backend) SetListMaxItems(i int) {
|
|
be.listMaxItems = i
|
|
}
|
|
|
|
// IsNotExist returns true if the error is caused by a not existing file.
|
|
func (be *Backend) IsNotExist(err error) bool {
|
|
return bloberror.HasCode(err, bloberror.BlobNotFound)
|
|
}
|
|
|
|
// Join combines path components with slashes.
|
|
func (be *Backend) Join(p ...string) string {
|
|
return path.Join(p...)
|
|
}
|
|
|
|
func (be *Backend) Connections() uint {
|
|
return be.connections
|
|
}
|
|
|
|
// Location returns this backend's location (the container name).
|
|
func (be *Backend) Location() string {
|
|
return be.Join(be.cfg.AccountName, be.cfg.Prefix)
|
|
}
|
|
|
|
// Hasher may return a hash function for calculating a content hash for the backend
|
|
func (be *Backend) Hasher() hash.Hash {
|
|
return md5.New()
|
|
}
|
|
|
|
// HasAtomicReplace returns whether Save() can atomically replace files
|
|
func (be *Backend) HasAtomicReplace() bool {
|
|
return true
|
|
}
|
|
|
|
// Path returns the path in the bucket that is used for this backend.
|
|
func (be *Backend) Path() string {
|
|
return be.prefix
|
|
}
|
|
|
|
// Save stores data in the backend at the handle.
|
|
func (be *Backend) Save(ctx context.Context, h restic.Handle, rd restic.RewindReader) error {
|
|
objName := be.Filename(h)
|
|
|
|
debug.Log("InsertObject(%v, %v)", be.cfg.AccountName, objName)
|
|
|
|
var err error
|
|
if rd.Length() < saveLargeSize {
|
|
// if it's smaller than 256miB, then just create the file directly from the reader
|
|
err = be.saveSmall(ctx, objName, rd)
|
|
} else {
|
|
// otherwise use the more complicated method
|
|
err = be.saveLarge(ctx, objName, rd)
|
|
}
|
|
|
|
return err
|
|
}
|
|
|
|
func (be *Backend) saveSmall(ctx context.Context, objName string, rd restic.RewindReader) error {
|
|
blockBlobClient := be.container.NewBlockBlobClient(objName)
|
|
|
|
// upload it as a new "block", use the base64 hash for the ID
|
|
id := base64.StdEncoding.EncodeToString(rd.Hash())
|
|
|
|
buf := make([]byte, rd.Length())
|
|
_, err := io.ReadFull(rd, buf)
|
|
if err != nil {
|
|
return errors.Wrap(err, "ReadFull")
|
|
}
|
|
|
|
reader := bytes.NewReader(buf)
|
|
_, err = blockBlobClient.StageBlock(ctx, id, streaming.NopCloser(reader), &blockblob.StageBlockOptions{
|
|
TransactionalValidation: blob.TransferValidationTypeMD5(rd.Hash()),
|
|
})
|
|
if err != nil {
|
|
return errors.Wrap(err, "StageBlock")
|
|
}
|
|
|
|
blocks := []string{id}
|
|
_, err = blockBlobClient.CommitBlockList(ctx, blocks, &blockblob.CommitBlockListOptions{})
|
|
return errors.Wrap(err, "CommitBlockList")
|
|
}
|
|
|
|
func (be *Backend) saveLarge(ctx context.Context, objName string, rd restic.RewindReader) error {
|
|
blockBlobClient := be.container.NewBlockBlobClient(objName)
|
|
|
|
buf := make([]byte, 100*1024*1024)
|
|
blocks := []string{}
|
|
uploadedBytes := 0
|
|
|
|
for {
|
|
n, err := io.ReadFull(rd, buf)
|
|
if err == io.ErrUnexpectedEOF {
|
|
err = nil
|
|
}
|
|
|
|
if err == io.EOF {
|
|
// end of file reached, no bytes have been read at all
|
|
break
|
|
}
|
|
|
|
if err != nil {
|
|
return errors.Wrap(err, "ReadFull")
|
|
}
|
|
|
|
buf = buf[:n]
|
|
uploadedBytes += n
|
|
|
|
// upload it as a new "block", use the base64 hash for the ID
|
|
h := md5.Sum(buf)
|
|
id := base64.StdEncoding.EncodeToString(h[:])
|
|
|
|
reader := bytes.NewReader(buf)
|
|
debug.Log("StageBlock %v with %d bytes", id, len(buf))
|
|
_, err = blockBlobClient.StageBlock(ctx, id, streaming.NopCloser(reader), &blockblob.StageBlockOptions{
|
|
TransactionalValidation: blob.TransferValidationTypeMD5(h[:]),
|
|
})
|
|
|
|
if err != nil {
|
|
return errors.Wrap(err, "StageBlock")
|
|
}
|
|
|
|
blocks = append(blocks, id)
|
|
}
|
|
|
|
// sanity check
|
|
if uploadedBytes != int(rd.Length()) {
|
|
return errors.Errorf("wrote %d bytes instead of the expected %d bytes", uploadedBytes, rd.Length())
|
|
}
|
|
|
|
_, err := blockBlobClient.CommitBlockList(ctx, blocks, &blockblob.CommitBlockListOptions{})
|
|
|
|
debug.Log("uploaded %d parts: %v", len(blocks), blocks)
|
|
return errors.Wrap(err, "CommitBlockList")
|
|
}
|
|
|
|
// Load runs fn with a reader that yields the contents of the file at h at the
|
|
// given offset.
|
|
func (be *Backend) Load(ctx context.Context, h restic.Handle, length int, offset int64, fn func(rd io.Reader) error) error {
|
|
return backend.DefaultLoad(ctx, h, length, offset, be.openReader, fn)
|
|
}
|
|
|
|
func (be *Backend) openReader(ctx context.Context, h restic.Handle, length int, offset int64) (io.ReadCloser, error) {
|
|
objName := be.Filename(h)
|
|
blockBlobClient := be.container.NewBlobClient(objName)
|
|
|
|
resp, err := blockBlobClient.DownloadStream(ctx, &blob.DownloadStreamOptions{
|
|
Range: azblob.HTTPRange{
|
|
Offset: offset,
|
|
Count: int64(length),
|
|
},
|
|
})
|
|
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
|
|
return resp.Body, err
|
|
}
|
|
|
|
// Stat returns information about a blob.
|
|
func (be *Backend) Stat(ctx context.Context, h restic.Handle) (restic.FileInfo, error) {
|
|
objName := be.Filename(h)
|
|
blobClient := be.container.NewBlobClient(objName)
|
|
|
|
props, err := blobClient.GetProperties(ctx, nil)
|
|
|
|
if err != nil {
|
|
return restic.FileInfo{}, errors.Wrap(err, "blob.GetProperties")
|
|
}
|
|
|
|
fi := restic.FileInfo{
|
|
Size: *props.ContentLength,
|
|
Name: h.Name,
|
|
}
|
|
return fi, nil
|
|
}
|
|
|
|
// Remove removes the blob with the given name and type.
|
|
func (be *Backend) Remove(ctx context.Context, h restic.Handle) error {
|
|
objName := be.Filename(h)
|
|
blob := be.container.NewBlobClient(objName)
|
|
|
|
_, err := blob.Delete(ctx, &azblob.DeleteBlobOptions{})
|
|
|
|
if be.IsNotExist(err) {
|
|
return nil
|
|
}
|
|
|
|
return errors.Wrap(err, "client.RemoveObject")
|
|
}
|
|
|
|
// List runs fn for each file in the backend which has the type t. When an
|
|
// error occurs (or fn returns an error), List stops and returns it.
|
|
func (be *Backend) List(ctx context.Context, t restic.FileType, fn func(restic.FileInfo) error) error {
|
|
prefix, _ := be.Basedir(t)
|
|
|
|
// make sure prefix ends with a slash
|
|
if !strings.HasSuffix(prefix, "/") {
|
|
prefix += "/"
|
|
}
|
|
|
|
max := int32(be.listMaxItems)
|
|
|
|
opts := &azContainer.ListBlobsFlatOptions{
|
|
MaxResults: &max,
|
|
Prefix: &prefix,
|
|
}
|
|
lister := be.container.NewListBlobsFlatPager(opts)
|
|
|
|
for lister.More() {
|
|
resp, err := lister.NextPage(ctx)
|
|
|
|
if err != nil {
|
|
return err
|
|
}
|
|
|
|
debug.Log("got %v objects", len(resp.Segment.BlobItems))
|
|
|
|
for _, item := range resp.Segment.BlobItems {
|
|
m := strings.TrimPrefix(*item.Name, prefix)
|
|
if m == "" {
|
|
continue
|
|
}
|
|
|
|
fi := restic.FileInfo{
|
|
Name: path.Base(m),
|
|
Size: *item.Properties.ContentLength,
|
|
}
|
|
|
|
if ctx.Err() != nil {
|
|
return ctx.Err()
|
|
}
|
|
|
|
err := fn(fi)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
|
|
if ctx.Err() != nil {
|
|
return ctx.Err()
|
|
}
|
|
|
|
}
|
|
}
|
|
|
|
return ctx.Err()
|
|
}
|
|
|
|
// Remove keys for a specified backend type.
|
|
func (be *Backend) removeKeys(ctx context.Context, t restic.FileType) error {
|
|
return be.List(ctx, t, func(fi restic.FileInfo) error {
|
|
return be.Remove(ctx, restic.Handle{Type: t, Name: fi.Name})
|
|
})
|
|
}
|
|
|
|
// Delete removes all restic keys in the bucket. It will not remove the bucket itself.
|
|
func (be *Backend) Delete(ctx context.Context) error {
|
|
alltypes := []restic.FileType{
|
|
restic.PackFile,
|
|
restic.KeyFile,
|
|
restic.LockFile,
|
|
restic.SnapshotFile,
|
|
restic.IndexFile}
|
|
|
|
for _, t := range alltypes {
|
|
err := be.removeKeys(ctx, t)
|
|
if err != nil {
|
|
return nil
|
|
}
|
|
}
|
|
|
|
return be.Remove(ctx, restic.Handle{Type: restic.ConfigFile})
|
|
}
|
|
|
|
// Close does nothing
|
|
func (be *Backend) Close() error { return nil }
|