mirror of
https://github.com/octoleo/restic.git
synced 2024-12-12 06:07:55 +00:00
533 lines
16 KiB
Go
533 lines
16 KiB
Go
|
/*
|
||
|
* Minio Go Library for Amazon S3 Compatible Cloud Storage (C) 2017 Minio, Inc.
|
||
|
*
|
||
|
* Licensed under the Apache License, Version 2.0 (the "License");
|
||
|
* you may not use this file except in compliance with the License.
|
||
|
* You may obtain a copy of the License at
|
||
|
*
|
||
|
* http://www.apache.org/licenses/LICENSE-2.0
|
||
|
*
|
||
|
* Unless required by applicable law or agreed to in writing, software
|
||
|
* distributed under the License is distributed on an "AS IS" BASIS,
|
||
|
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||
|
* See the License for the specific language governing permissions and
|
||
|
* limitations under the License.
|
||
|
*/
|
||
|
|
||
|
package minio
|
||
|
|
||
|
import (
|
||
|
"encoding/base64"
|
||
|
"fmt"
|
||
|
"net/http"
|
||
|
"net/url"
|
||
|
"strconv"
|
||
|
"strings"
|
||
|
"time"
|
||
|
|
||
|
"github.com/minio/minio-go/pkg/s3utils"
|
||
|
)
|
||
|
|
||
|
// SSEInfo - represents Server-Side-Encryption parameters specified by
|
||
|
// a user.
|
||
|
type SSEInfo struct {
|
||
|
key []byte
|
||
|
algo string
|
||
|
}
|
||
|
|
||
|
// NewSSEInfo - specifies (binary or un-encoded) encryption key and
|
||
|
// algorithm name. If algo is empty, it defaults to "AES256". Ref:
|
||
|
// https://docs.aws.amazon.com/AmazonS3/latest/dev/ServerSideEncryptionCustomerKeys.html
|
||
|
func NewSSEInfo(key []byte, algo string) SSEInfo {
|
||
|
if algo == "" {
|
||
|
algo = "AES256"
|
||
|
}
|
||
|
return SSEInfo{key, algo}
|
||
|
}
|
||
|
|
||
|
// internal method that computes SSE-C headers
|
||
|
func (s *SSEInfo) getSSEHeaders(isCopySource bool) map[string]string {
|
||
|
if s == nil {
|
||
|
return nil
|
||
|
}
|
||
|
|
||
|
cs := ""
|
||
|
if isCopySource {
|
||
|
cs = "copy-source-"
|
||
|
}
|
||
|
return map[string]string{
|
||
|
"x-amz-" + cs + "server-side-encryption-customer-algorithm": s.algo,
|
||
|
"x-amz-" + cs + "server-side-encryption-customer-key": base64.StdEncoding.EncodeToString(s.key),
|
||
|
"x-amz-" + cs + "server-side-encryption-customer-key-MD5": base64.StdEncoding.EncodeToString(sumMD5(s.key)),
|
||
|
}
|
||
|
}
|
||
|
|
||
|
// GetSSEHeaders - computes and returns headers for SSE-C as key-value
|
||
|
// pairs. They can be set as metadata in PutObject* requests (for
|
||
|
// encryption) or be set as request headers in `Core.GetObject` (for
|
||
|
// decryption).
|
||
|
func (s *SSEInfo) GetSSEHeaders() map[string]string {
|
||
|
return s.getSSEHeaders(false)
|
||
|
}
|
||
|
|
||
|
// DestinationInfo - type with information about the object to be
|
||
|
// created via server-side copy requests, using the Compose API.
|
||
|
type DestinationInfo struct {
|
||
|
bucket, object string
|
||
|
|
||
|
// key for encrypting destination
|
||
|
encryption *SSEInfo
|
||
|
|
||
|
// if no user-metadata is provided, it is copied from source
|
||
|
// (when there is only once source object in the compose
|
||
|
// request)
|
||
|
userMetadata map[string]string
|
||
|
}
|
||
|
|
||
|
// NewDestinationInfo - creates a compose-object/copy-source
|
||
|
// destination info object.
|
||
|
//
|
||
|
// `encSSEC` is the key info for server-side-encryption with customer
|
||
|
// provided key. If it is nil, no encryption is performed.
|
||
|
//
|
||
|
// `userMeta` is the user-metadata key-value pairs to be set on the
|
||
|
// destination. The keys are automatically prefixed with `x-amz-meta-`
|
||
|
// if needed. If nil is passed, and if only a single source (of any
|
||
|
// size) is provided in the ComposeObject call, then metadata from the
|
||
|
// source is copied to the destination.
|
||
|
func NewDestinationInfo(bucket, object string, encryptSSEC *SSEInfo,
|
||
|
userMeta map[string]string) (d DestinationInfo, err error) {
|
||
|
|
||
|
// Input validation.
|
||
|
if err = s3utils.CheckValidBucketName(bucket); err != nil {
|
||
|
return d, err
|
||
|
}
|
||
|
if err = s3utils.CheckValidObjectName(object); err != nil {
|
||
|
return d, err
|
||
|
}
|
||
|
|
||
|
// Process custom-metadata to remove a `x-amz-meta-` prefix if
|
||
|
// present and validate that keys are distinct (after this
|
||
|
// prefix removal).
|
||
|
m := make(map[string]string)
|
||
|
for k, v := range userMeta {
|
||
|
if strings.HasPrefix(strings.ToLower(k), "x-amz-meta-") {
|
||
|
k = k[len("x-amz-meta-"):]
|
||
|
}
|
||
|
if _, ok := m[k]; ok {
|
||
|
return d, fmt.Errorf("Cannot add both %s and x-amz-meta-%s keys as custom metadata", k, k)
|
||
|
}
|
||
|
m[k] = v
|
||
|
}
|
||
|
|
||
|
return DestinationInfo{
|
||
|
bucket: bucket,
|
||
|
object: object,
|
||
|
encryption: encryptSSEC,
|
||
|
userMetadata: m,
|
||
|
}, nil
|
||
|
}
|
||
|
|
||
|
// getUserMetaHeadersMap - construct appropriate key-value pairs to send
|
||
|
// as headers from metadata map to pass into copy-object request. For
|
||
|
// single part copy-object (i.e. non-multipart object), enable the
|
||
|
// withCopyDirectiveHeader to set the `x-amz-metadata-directive` to
|
||
|
// `REPLACE`, so that metadata headers from the source are not copied
|
||
|
// over.
|
||
|
func (d *DestinationInfo) getUserMetaHeadersMap(withCopyDirectiveHeader bool) map[string]string {
|
||
|
if len(d.userMetadata) == 0 {
|
||
|
return nil
|
||
|
}
|
||
|
r := make(map[string]string)
|
||
|
if withCopyDirectiveHeader {
|
||
|
r["x-amz-metadata-directive"] = "REPLACE"
|
||
|
}
|
||
|
for k, v := range d.userMetadata {
|
||
|
r["x-amz-meta-"+k] = v
|
||
|
}
|
||
|
return r
|
||
|
}
|
||
|
|
||
|
// SourceInfo - represents a source object to be copied, using
|
||
|
// server-side copying APIs.
|
||
|
type SourceInfo struct {
|
||
|
bucket, object string
|
||
|
|
||
|
start, end int64
|
||
|
|
||
|
decryptKey *SSEInfo
|
||
|
// Headers to send with the upload-part-copy request involving
|
||
|
// this source object.
|
||
|
Headers http.Header
|
||
|
}
|
||
|
|
||
|
// NewSourceInfo - create a compose-object/copy-object source info
|
||
|
// object.
|
||
|
//
|
||
|
// `decryptSSEC` is the decryption key using server-side-encryption
|
||
|
// with customer provided key. It may be nil if the source is not
|
||
|
// encrypted.
|
||
|
func NewSourceInfo(bucket, object string, decryptSSEC *SSEInfo) SourceInfo {
|
||
|
r := SourceInfo{
|
||
|
bucket: bucket,
|
||
|
object: object,
|
||
|
start: -1, // range is unspecified by default
|
||
|
decryptKey: decryptSSEC,
|
||
|
Headers: make(http.Header),
|
||
|
}
|
||
|
|
||
|
// Set the source header
|
||
|
r.Headers.Set("x-amz-copy-source", s3utils.EncodePath(bucket+"/"+object))
|
||
|
|
||
|
// Assemble decryption headers for upload-part-copy request
|
||
|
for k, v := range decryptSSEC.getSSEHeaders(true) {
|
||
|
r.Headers.Set(k, v)
|
||
|
}
|
||
|
|
||
|
return r
|
||
|
}
|
||
|
|
||
|
// SetRange - Set the start and end offset of the source object to be
|
||
|
// copied. If this method is not called, the whole source object is
|
||
|
// copied.
|
||
|
func (s *SourceInfo) SetRange(start, end int64) error {
|
||
|
if start > end || start < 0 {
|
||
|
return ErrInvalidArgument("start must be non-negative, and start must be at most end.")
|
||
|
}
|
||
|
// Note that 0 <= start <= end
|
||
|
s.start, s.end = start, end
|
||
|
return nil
|
||
|
}
|
||
|
|
||
|
// SetMatchETagCond - Set ETag match condition. The object is copied
|
||
|
// only if the etag of the source matches the value given here.
|
||
|
func (s *SourceInfo) SetMatchETagCond(etag string) error {
|
||
|
if etag == "" {
|
||
|
return ErrInvalidArgument("ETag cannot be empty.")
|
||
|
}
|
||
|
s.Headers.Set("x-amz-copy-source-if-match", etag)
|
||
|
return nil
|
||
|
}
|
||
|
|
||
|
// SetMatchETagExceptCond - Set the ETag match exception
|
||
|
// condition. The object is copied only if the etag of the source is
|
||
|
// not the value given here.
|
||
|
func (s *SourceInfo) SetMatchETagExceptCond(etag string) error {
|
||
|
if etag == "" {
|
||
|
return ErrInvalidArgument("ETag cannot be empty.")
|
||
|
}
|
||
|
s.Headers.Set("x-amz-copy-source-if-none-match", etag)
|
||
|
return nil
|
||
|
}
|
||
|
|
||
|
// SetModifiedSinceCond - Set the modified since condition.
|
||
|
func (s *SourceInfo) SetModifiedSinceCond(modTime time.Time) error {
|
||
|
if modTime.IsZero() {
|
||
|
return ErrInvalidArgument("Input time cannot be 0.")
|
||
|
}
|
||
|
s.Headers.Set("x-amz-copy-source-if-modified-since", modTime.Format(http.TimeFormat))
|
||
|
return nil
|
||
|
}
|
||
|
|
||
|
// SetUnmodifiedSinceCond - Set the unmodified since condition.
|
||
|
func (s *SourceInfo) SetUnmodifiedSinceCond(modTime time.Time) error {
|
||
|
if modTime.IsZero() {
|
||
|
return ErrInvalidArgument("Input time cannot be 0.")
|
||
|
}
|
||
|
s.Headers.Set("x-amz-copy-source-if-unmodified-since", modTime.Format(http.TimeFormat))
|
||
|
return nil
|
||
|
}
|
||
|
|
||
|
// Helper to fetch size and etag of an object using a StatObject call.
|
||
|
func (s *SourceInfo) getProps(c Client) (size int64, etag string, userMeta map[string]string, err error) {
|
||
|
// Get object info - need size and etag here. Also, decryption
|
||
|
// headers are added to the stat request if given.
|
||
|
var objInfo ObjectInfo
|
||
|
rh := NewGetReqHeaders()
|
||
|
for k, v := range s.decryptKey.getSSEHeaders(false) {
|
||
|
rh.Set(k, v)
|
||
|
}
|
||
|
objInfo, err = c.statObject(s.bucket, s.object, rh)
|
||
|
if err != nil {
|
||
|
err = fmt.Errorf("Could not stat object - %s/%s: %v", s.bucket, s.object, err)
|
||
|
} else {
|
||
|
size = objInfo.Size
|
||
|
etag = objInfo.ETag
|
||
|
userMeta = make(map[string]string)
|
||
|
for k, v := range objInfo.Metadata {
|
||
|
if strings.HasPrefix(k, "x-amz-meta-") {
|
||
|
if len(v) > 0 {
|
||
|
userMeta[k] = v[0]
|
||
|
}
|
||
|
}
|
||
|
}
|
||
|
}
|
||
|
return
|
||
|
}
|
||
|
|
||
|
// uploadPartCopy - helper function to create a part in a multipart
|
||
|
// upload via an upload-part-copy request
|
||
|
// https://docs.aws.amazon.com/AmazonS3/latest/API/mpUploadUploadPartCopy.html
|
||
|
func (c Client) uploadPartCopy(bucket, object, uploadID string, partNumber int,
|
||
|
headers http.Header) (p CompletePart, err error) {
|
||
|
|
||
|
// Build query parameters
|
||
|
urlValues := make(url.Values)
|
||
|
urlValues.Set("partNumber", strconv.Itoa(partNumber))
|
||
|
urlValues.Set("uploadId", uploadID)
|
||
|
|
||
|
// Send upload-part-copy request
|
||
|
resp, err := c.executeMethod("PUT", requestMetadata{
|
||
|
bucketName: bucket,
|
||
|
objectName: object,
|
||
|
customHeader: headers,
|
||
|
queryValues: urlValues,
|
||
|
})
|
||
|
defer closeResponse(resp)
|
||
|
if err != nil {
|
||
|
return p, err
|
||
|
}
|
||
|
|
||
|
// Check if we got an error response.
|
||
|
if resp.StatusCode != http.StatusOK {
|
||
|
return p, httpRespToErrorResponse(resp, bucket, object)
|
||
|
}
|
||
|
|
||
|
// Decode copy-part response on success.
|
||
|
cpObjRes := copyObjectResult{}
|
||
|
err = xmlDecoder(resp.Body, &cpObjRes)
|
||
|
if err != nil {
|
||
|
return p, err
|
||
|
}
|
||
|
p.PartNumber, p.ETag = partNumber, cpObjRes.ETag
|
||
|
return p, nil
|
||
|
}
|
||
|
|
||
|
// ComposeObject - creates an object using server-side copying of
|
||
|
// existing objects. It takes a list of source objects (with optional
|
||
|
// offsets) and concatenates them into a new object using only
|
||
|
// server-side copying operations.
|
||
|
func (c Client) ComposeObject(dst DestinationInfo, srcs []SourceInfo) error {
|
||
|
if len(srcs) < 1 || len(srcs) > maxPartsCount {
|
||
|
return ErrInvalidArgument("There must be as least one and upto 10000 source objects.")
|
||
|
}
|
||
|
|
||
|
srcSizes := make([]int64, len(srcs))
|
||
|
var totalSize, size, totalParts int64
|
||
|
var srcUserMeta map[string]string
|
||
|
var etag string
|
||
|
var err error
|
||
|
for i, src := range srcs {
|
||
|
size, etag, srcUserMeta, err = src.getProps(c)
|
||
|
if err != nil {
|
||
|
return fmt.Errorf("Could not get source props for %s/%s: %v", src.bucket, src.object, err)
|
||
|
}
|
||
|
|
||
|
// Error out if client side encryption is used in this source object when
|
||
|
// more than one source objects are given.
|
||
|
if len(srcs) > 1 && src.Headers.Get("x-amz-meta-x-amz-key") != "" {
|
||
|
return ErrInvalidArgument(
|
||
|
fmt.Sprintf("Client side encryption is used in source object %s/%s", src.bucket, src.object))
|
||
|
}
|
||
|
|
||
|
// Since we did a HEAD to get size, we use the ETag
|
||
|
// value to make sure the object has not changed by
|
||
|
// the time we perform the copy. This is done, only if
|
||
|
// the user has not set their own ETag match
|
||
|
// condition.
|
||
|
if src.Headers.Get("x-amz-copy-source-if-match") == "" {
|
||
|
src.SetMatchETagCond(etag)
|
||
|
}
|
||
|
|
||
|
// Check if a segment is specified, and if so, is the
|
||
|
// segment within object bounds?
|
||
|
if src.start != -1 {
|
||
|
// Since range is specified,
|
||
|
// 0 <= src.start <= src.end
|
||
|
// so only invalid case to check is:
|
||
|
if src.end >= size {
|
||
|
return ErrInvalidArgument(
|
||
|
fmt.Sprintf("SourceInfo %d has invalid segment-to-copy [%d, %d] (size is %d)",
|
||
|
i, src.start, src.end, size))
|
||
|
}
|
||
|
size = src.end - src.start + 1
|
||
|
}
|
||
|
|
||
|
// Only the last source may be less than `absMinPartSize`
|
||
|
if size < absMinPartSize && i < len(srcs)-1 {
|
||
|
return ErrInvalidArgument(
|
||
|
fmt.Sprintf("SourceInfo %d is too small (%d) and it is not the last part", i, size))
|
||
|
}
|
||
|
|
||
|
// Is data to copy too large?
|
||
|
totalSize += size
|
||
|
if totalSize > maxMultipartPutObjectSize {
|
||
|
return ErrInvalidArgument(fmt.Sprintf("Cannot compose an object of size %d (> 5TiB)", totalSize))
|
||
|
}
|
||
|
|
||
|
// record source size
|
||
|
srcSizes[i] = size
|
||
|
|
||
|
// calculate parts needed for current source
|
||
|
totalParts += partsRequired(size)
|
||
|
// Do we need more parts than we are allowed?
|
||
|
if totalParts > maxPartsCount {
|
||
|
return ErrInvalidArgument(fmt.Sprintf(
|
||
|
"Your proposed compose object requires more than %d parts", maxPartsCount))
|
||
|
}
|
||
|
}
|
||
|
|
||
|
// Single source object case (i.e. when only one source is
|
||
|
// involved, it is being copied wholly and at most 5GiB in
|
||
|
// size).
|
||
|
if totalParts == 1 && srcs[0].start == -1 && totalSize <= maxPartSize {
|
||
|
h := srcs[0].Headers
|
||
|
// Add destination encryption headers
|
||
|
for k, v := range dst.encryption.getSSEHeaders(false) {
|
||
|
h.Set(k, v)
|
||
|
}
|
||
|
|
||
|
// If no user metadata is specified (and so, the
|
||
|
// for-loop below is not entered), metadata from the
|
||
|
// source is copied to the destination (due to
|
||
|
// single-part copy-object PUT request behaviour).
|
||
|
for k, v := range dst.getUserMetaHeadersMap(true) {
|
||
|
h.Set(k, v)
|
||
|
}
|
||
|
|
||
|
// Send copy request
|
||
|
resp, err := c.executeMethod("PUT", requestMetadata{
|
||
|
bucketName: dst.bucket,
|
||
|
objectName: dst.object,
|
||
|
customHeader: h,
|
||
|
})
|
||
|
defer closeResponse(resp)
|
||
|
if err != nil {
|
||
|
return err
|
||
|
}
|
||
|
// Check if we got an error response.
|
||
|
if resp.StatusCode != http.StatusOK {
|
||
|
return httpRespToErrorResponse(resp, dst.bucket, dst.object)
|
||
|
}
|
||
|
|
||
|
// Return nil on success.
|
||
|
return nil
|
||
|
}
|
||
|
|
||
|
// Now, handle multipart-copy cases.
|
||
|
|
||
|
// 1. Initiate a new multipart upload.
|
||
|
|
||
|
// Set user-metadata on the destination object. If no
|
||
|
// user-metadata is specified, and there is only one source,
|
||
|
// (only) then metadata from source is copied.
|
||
|
userMeta := dst.getUserMetaHeadersMap(false)
|
||
|
metaMap := userMeta
|
||
|
if len(userMeta) == 0 && len(srcs) == 1 {
|
||
|
metaMap = srcUserMeta
|
||
|
}
|
||
|
metaHeaders := make(map[string][]string)
|
||
|
for k, v := range metaMap {
|
||
|
metaHeaders[k] = append(metaHeaders[k], v)
|
||
|
}
|
||
|
uploadID, err := c.newUploadID(dst.bucket, dst.object, metaHeaders)
|
||
|
if err != nil {
|
||
|
return fmt.Errorf("Error creating new upload: %v", err)
|
||
|
}
|
||
|
|
||
|
// 2. Perform copy part uploads
|
||
|
objParts := []CompletePart{}
|
||
|
partIndex := 1
|
||
|
for i, src := range srcs {
|
||
|
h := src.Headers
|
||
|
// Add destination encryption headers
|
||
|
for k, v := range dst.encryption.getSSEHeaders(false) {
|
||
|
h.Set(k, v)
|
||
|
}
|
||
|
|
||
|
// calculate start/end indices of parts after
|
||
|
// splitting.
|
||
|
startIdx, endIdx := calculateEvenSplits(srcSizes[i], src)
|
||
|
for j, start := range startIdx {
|
||
|
end := endIdx[j]
|
||
|
|
||
|
// Add (or reset) source range header for
|
||
|
// upload part copy request.
|
||
|
h.Set("x-amz-copy-source-range",
|
||
|
fmt.Sprintf("bytes=%d-%d", start, end))
|
||
|
|
||
|
// make upload-part-copy request
|
||
|
complPart, err := c.uploadPartCopy(dst.bucket,
|
||
|
dst.object, uploadID, partIndex, h)
|
||
|
if err != nil {
|
||
|
return fmt.Errorf("Error in upload-part-copy - %v", err)
|
||
|
}
|
||
|
objParts = append(objParts, complPart)
|
||
|
partIndex++
|
||
|
}
|
||
|
}
|
||
|
|
||
|
// 3. Make final complete-multipart request.
|
||
|
_, err = c.completeMultipartUpload(dst.bucket, dst.object, uploadID,
|
||
|
completeMultipartUpload{Parts: objParts})
|
||
|
if err != nil {
|
||
|
err = fmt.Errorf("Error in complete-multipart request - %v", err)
|
||
|
}
|
||
|
return err
|
||
|
}
|
||
|
|
||
|
// partsRequired is ceiling(size / copyPartSize)
|
||
|
func partsRequired(size int64) int64 {
|
||
|
r := size / copyPartSize
|
||
|
if size%copyPartSize > 0 {
|
||
|
r++
|
||
|
}
|
||
|
return r
|
||
|
}
|
||
|
|
||
|
// calculateEvenSplits - computes splits for a source and returns
|
||
|
// start and end index slices. Splits happen evenly to be sure that no
|
||
|
// part is less than 5MiB, as that could fail the multipart request if
|
||
|
// it is not the last part.
|
||
|
func calculateEvenSplits(size int64, src SourceInfo) (startIndex, endIndex []int64) {
|
||
|
if size == 0 {
|
||
|
return
|
||
|
}
|
||
|
|
||
|
reqParts := partsRequired(size)
|
||
|
startIndex = make([]int64, reqParts)
|
||
|
endIndex = make([]int64, reqParts)
|
||
|
// Compute number of required parts `k`, as:
|
||
|
//
|
||
|
// k = ceiling(size / copyPartSize)
|
||
|
//
|
||
|
// Now, distribute the `size` bytes in the source into
|
||
|
// k parts as evenly as possible:
|
||
|
//
|
||
|
// r parts sized (q+1) bytes, and
|
||
|
// (k - r) parts sized q bytes, where
|
||
|
//
|
||
|
// size = q * k + r (by simple division of size by k,
|
||
|
// so that 0 <= r < k)
|
||
|
//
|
||
|
start := src.start
|
||
|
if start == -1 {
|
||
|
start = 0
|
||
|
}
|
||
|
quot, rem := size/reqParts, size%reqParts
|
||
|
nextStart := start
|
||
|
for j := int64(0); j < reqParts; j++ {
|
||
|
curPartSize := quot
|
||
|
if j < rem {
|
||
|
curPartSize++
|
||
|
}
|
||
|
|
||
|
cStart := nextStart
|
||
|
cEnd := cStart + curPartSize - 1
|
||
|
nextStart = cEnd + 1
|
||
|
|
||
|
startIndex[j], endIndex[j] = cStart, cEnd
|
||
|
}
|
||
|
return
|
||
|
}
|