Update minio-library

This addresses #388
This commit is contained in:
Alexander Neumann 2016-01-27 23:18:23 +01:00
parent ce4a7f16ca
commit f8daadc5ef
35 changed files with 922 additions and 2626 deletions

4
Godeps/Godeps.json generated
View File

@ -24,8 +24,8 @@
},
{
"ImportPath": "github.com/minio/minio-go",
"Comment": "v0.2.5-209-g77f35ea",
"Rev": "77f35ea56099f50b0425d0e2f3949773dae723c0"
"Comment": "v0.2.5-247-ge168a01",
"Rev": "e168a01f2683897cc623db67f39ccff35bec0597"
},
{
"ImportPath": "github.com/pkg/sftp",

View File

@ -10,8 +10,8 @@ env:
- ARCH=i686
go:
- 1.5.1
- 1.5.2
- 1.5.3
script:
- go vet ./...

View File

@ -72,8 +72,8 @@ func main() {
* [ListIncompleteUploads(bucketName, prefix, recursive, chan<- struct{}) <-chan ObjectMultipartInfo](examples/s3/listincompleteuploads.go)
### Object Operations.
* [PutObject(bucketName, objectName, io.Reader, size, contentType) error](examples/s3/putobject.go)
* [GetObject(bucketName, objectName) (io.ReadCloser, ObjectInfo, error)](examples/s3/getobject.go)
* [PutObject(bucketName, objectName, io.Reader, contentType) error](examples/s3/putobject.go)
* [GetObject(bucketName, objectName) (*Object, error)](examples/s3/getobject.go)
* [StatObject(bucketName, objectName) (ObjectInfo, error)](examples/s3/statobject.go)
* [RemoveObject(bucketName, objectName) error](examples/s3/removeobject.go)
* [RemoveIncompleteUpload(bucketName, objectName) <-chan error](examples/s3/removeincompleteupload.go)

View File

@ -21,9 +21,9 @@ import "time"
// BucketInfo container for bucket metadata.
type BucketInfo struct {
// The name of the bucket.
Name string
Name string `json:"name"`
// Date the bucket was created.
CreationDate time.Time
CreationDate time.Time `json:"creationDate"`
}
// ObjectInfo container for object metadata.
@ -31,24 +31,24 @@ type ObjectInfo struct {
// An ETag is optionally set to md5sum of an object. In case of multipart objects,
// ETag is of the form MD5SUM-N where MD5SUM is md5sum of all individual md5sums of
// each parts concatenated into one string.
ETag string
ETag string `json:"etag"`
Key string // Name of the object
LastModified time.Time // Date and time the object was last modified.
Size int64 // Size in bytes of the object.
ContentType string // A standard MIME type describing the format of the object data.
Key string `json:"name"` // Name of the object
LastModified time.Time `json:"lastModified"` // Date and time the object was last modified.
Size int64 `json:"size"` // Size in bytes of the object.
ContentType string `json:"contentType"` // A standard MIME type describing the format of the object data.
// Owner name.
Owner struct {
DisplayName string
ID string
}
DisplayName string `json:"name"`
ID string `json:"id"`
} `json:"owner"`
// The class of storage used to store the object.
StorageClass string
StorageClass string `json:"storageClass"`
// Error
Err error
Err error `json:"-"`
}
// ObjectMultipartInfo container for multipart object metadata.

View File

@ -83,9 +83,9 @@ const (
reportIssue = "Please report this issue at https://github.com/minio/minio-go/issues."
)
// HTTPRespToErrorResponse returns a new encoded ErrorResponse
// httpRespToErrorResponse returns a new encoded ErrorResponse
// structure as error.
func HTTPRespToErrorResponse(resp *http.Response, bucketName, objectName string) error {
func httpRespToErrorResponse(resp *http.Response, bucketName, objectName string) error {
if resp == nil {
msg := "Response is empty. " + reportIssue
return ErrInvalidArgument(msg)
@ -161,8 +161,8 @@ func HTTPRespToErrorResponse(resp *http.Response, bucketName, objectName string)
}
// ErrEntityTooLarge - Input size is larger than supported maximum.
func ErrEntityTooLarge(totalSize int64, bucketName, objectName string) error {
msg := fmt.Sprintf("Your proposed upload size %d exceeds the maximum allowed object size '5GiB' for single PUT operation.", totalSize)
func ErrEntityTooLarge(totalSize, maxObjectSize int64, bucketName, objectName string) error {
msg := fmt.Sprintf("Your proposed upload size %d exceeds the maximum allowed object size %d for single PUT operation.", totalSize, maxObjectSize)
return ErrorResponse{
Code: "EntityTooLarge",
Message: msg,
@ -182,19 +182,6 @@ func ErrEntityTooSmall(totalSize int64, bucketName, objectName string) error {
}
}
// ErrUnexpectedShortRead - Unexpected shorter read of input buffer from
// target.
func ErrUnexpectedShortRead(totalRead, totalSize int64, bucketName, objectName string) error {
msg := fmt.Sprintf("Data read %s is shorter than the size %s of input buffer.",
strconv.FormatInt(totalRead, 10), strconv.FormatInt(totalSize, 10))
return ErrorResponse{
Code: "UnexpectedShortRead",
Message: msg,
BucketName: bucketName,
Key: objectName,
}
}
// ErrUnexpectedEOF - Unexpected end of file reached.
func ErrUnexpectedEOF(totalRead, totalSize int64, bucketName, objectName string) error {
msg := fmt.Sprintf("Data read %s is not equal to the size %s of the input Reader.",

View File

@ -90,7 +90,9 @@ func (c Client) FGetObject(bucketName, objectName, filePath string) error {
}
// Close the file before rename, this is specifically needed for Windows users.
filePart.Close()
if err = filePart.Close(); err != nil {
return err
}
// Safely completed. Now commit by renaming to actual filename.
if err = os.Rename(filePartPath, filePath); err != nil {

View File

@ -65,7 +65,7 @@ func (c Client) GetBucketACL(bucketName string) (BucketACL, error) {
}
if resp != nil {
if resp.StatusCode != http.StatusOK {
return "", HTTPRespToErrorResponse(resp, bucketName, "")
return "", httpRespToErrorResponse(resp, bucketName, "")
}
}
@ -500,7 +500,7 @@ func (c Client) getObject(bucketName, objectName string, offset, length int64) (
}
if resp != nil {
if resp.StatusCode != http.StatusOK && resp.StatusCode != http.StatusPartialContent {
return nil, ObjectInfo{}, HTTPRespToErrorResponse(resp, bucketName, objectName)
return nil, ObjectInfo{}, httpRespToErrorResponse(resp, bucketName, objectName)
}
}

View File

@ -47,7 +47,7 @@ func (c Client) ListBuckets() ([]BucketInfo, error) {
}
if resp != nil {
if resp.StatusCode != http.StatusOK {
return nil, HTTPRespToErrorResponse(resp, "", "")
return nil, httpRespToErrorResponse(resp, "", "")
}
}
listAllMyBucketsResult := listAllMyBucketsResult{}
@ -64,7 +64,7 @@ func (c Client) ListBuckets() ([]BucketInfo, error) {
// the specified bucket. If recursion is enabled it would list
// all subdirectories and all its contents.
//
// Your input paramters are just bucketName, objectPrefix, recursive
// Your input parameters are just bucketName, objectPrefix, recursive
// and a done channel for pro-actively closing the internal go
// routine. If you enable recursive as 'true' this function will
// return back all the objects in a given bucket name and object
@ -168,7 +168,7 @@ func (c Client) ListObjects(bucketName, objectPrefix string, recursive bool, don
// listObjects - (List Objects) - List some or all (up to 1000) of the objects in a bucket.
//
// You can use the request parameters as selection criteria to return a subset of the objects in a bucket.
// request paramters :-
// request parameters :-
// ---------
// ?marker - Specifies the key to start with when listing objects in a bucket.
// ?delimiter - A delimiter is a character you use to group keys.
@ -222,7 +222,7 @@ func (c Client) listObjectsQuery(bucketName, objectPrefix, objectMarker, delimit
}
if resp != nil {
if resp.StatusCode != http.StatusOK {
return listBucketResult{}, HTTPRespToErrorResponse(resp, bucketName, "")
return listBucketResult{}, httpRespToErrorResponse(resp, bucketName, "")
}
}
// Decode listBuckets XML.
@ -240,8 +240,8 @@ func (c Client) listObjectsQuery(bucketName, objectPrefix, objectMarker, delimit
// objectPrefix from the specified bucket. If recursion is enabled
// it would list all subdirectories and all its contents.
//
// Your input paramters are just bucketName, objectPrefix, recursive
// and a done channel to proactively close the internal go routine.
// Your input parameters are just bucketName, objectPrefix, recursive
// and a done channel to pro-actively close the internal go routine.
// If you enable recursive as 'true' this function will return back all
// the multipart objects in a given bucket name.
//
@ -352,7 +352,7 @@ func (c Client) listIncompleteUploads(bucketName, objectPrefix string, recursive
// - Lists some or all (up to 1000) in-progress multipart uploads in a bucket.
//
// You can use the request parameters as selection criteria to return a subset of the uploads in a bucket.
// request paramters. :-
// request parameters. :-
// ---------
// ?key-marker - Specifies the multipart upload after which listing should begin.
// ?upload-id-marker - Together with key-marker specifies the multipart upload after which listing should begin.
@ -404,7 +404,7 @@ func (c Client) listMultipartUploadsQuery(bucketName, keyMarker, uploadIDMarker,
}
if resp != nil {
if resp.StatusCode != http.StatusOK {
return listMultipartUploadsResult{}, HTTPRespToErrorResponse(resp, bucketName, "")
return listMultipartUploadsResult{}, httpRespToErrorResponse(resp, bucketName, "")
}
}
// Decode response body.
@ -447,23 +447,29 @@ func (c Client) listObjectParts(bucketName, objectName, uploadID string) (partsI
}
// findUploadID lists all incomplete uploads and finds the uploadID of the matching object name.
func (c Client) findUploadID(bucketName, objectName string) (string, error) {
func (c Client) findUploadID(bucketName, objectName string) (uploadID string, err error) {
// Make list incomplete uploads recursive.
isRecursive := true
// Turn off size aggregation of individual parts, in this request.
isAggregateSize := false
// NOTE: done Channel is set to 'nil, this will drain go routine until exhaustion.
for mpUpload := range c.listIncompleteUploads(bucketName, objectName, isRecursive, isAggregateSize, nil) {
// latestUpload to track the latest multipart info for objectName.
var latestUpload ObjectMultipartInfo
// Create done channel to cleanup the routine.
doneCh := make(chan struct{})
defer close(doneCh)
// List all incomplete uploads.
for mpUpload := range c.listIncompleteUploads(bucketName, objectName, isRecursive, isAggregateSize, doneCh) {
if mpUpload.Err != nil {
return "", mpUpload.Err
}
// if object name found, return the upload id.
if objectName == mpUpload.Key {
return mpUpload.UploadID, nil
if mpUpload.Initiated.Sub(latestUpload.Initiated) > 0 {
latestUpload = mpUpload
}
}
}
// No upload id was found, return success and empty upload id.
return "", nil
// Return the latest upload id.
return latestUpload.UploadID, nil
}
// getTotalMultipartSize - calculate total uploaded size for the a given multipart object.
@ -484,7 +490,7 @@ func (c Client) getTotalMultipartSize(bucketName, objectName, uploadID string) (
// for a specific multipart upload
//
// You can use the request parameters as selection criteria to return
// a subset of the uploads in a bucket, request paramters :-
// a subset of the uploads in a bucket, request parameters :-
// ---------
// ?part-number-marker - Specifies the part after which listing should
// begin.
@ -520,7 +526,7 @@ func (c Client) listObjectPartsQuery(bucketName, objectName, uploadID string, pa
}
if resp != nil {
if resp.StatusCode != http.StatusOK {
return listObjectPartsResult{}, HTTPRespToErrorResponse(resp, bucketName, objectName)
return listObjectPartsResult{}, httpRespToErrorResponse(resp, bucketName, objectName)
}
}
// Decode list object parts XML.

View File

@ -21,9 +21,9 @@ import (
"time"
)
// PresignedGetObject - Returns a presigned URL to access an object without credentials.
// presignURL - Returns a presigned URL for an input 'method'.
// Expires maximum is 7days - ie. 604800 and minimum is 1.
func (c Client) PresignedGetObject(bucketName, objectName string, expires time.Duration) (string, error) {
func (c Client) presignURL(method string, bucketName string, objectName string, expires time.Duration) (url string, err error) {
// Input validation.
if err := isValidBucketName(bucketName); err != nil {
return "", err
@ -35,10 +35,14 @@ func (c Client) PresignedGetObject(bucketName, objectName string, expires time.D
return "", err
}
if method == "" {
return "", ErrInvalidArgument("method cannot be empty.")
}
expireSeconds := int64(expires / time.Second)
// Instantiate a new request.
// Since expires is set newRequest will presign the request.
req, err := c.newRequest("GET", requestMetadata{
req, err := c.newRequest(method, requestMetadata{
presignURL: true,
bucketName: bucketName,
objectName: objectName,
@ -50,33 +54,16 @@ func (c Client) PresignedGetObject(bucketName, objectName string, expires time.D
return req.URL.String(), nil
}
// PresignedGetObject - Returns a presigned URL to access an object without credentials.
// Expires maximum is 7days - ie. 604800 and minimum is 1.
func (c Client) PresignedGetObject(bucketName string, objectName string, expires time.Duration) (url string, err error) {
return c.presignURL("GET", bucketName, objectName, expires)
}
// PresignedPutObject - Returns a presigned URL to upload an object without credentials.
// Expires maximum is 7days - ie. 604800 and minimum is 1.
func (c Client) PresignedPutObject(bucketName, objectName string, expires time.Duration) (string, error) {
// Input validation.
if err := isValidBucketName(bucketName); err != nil {
return "", err
}
if err := isValidObjectName(objectName); err != nil {
return "", err
}
if err := isValidExpiry(expires); err != nil {
return "", err
}
expireSeconds := int64(expires / time.Second)
// Instantiate a new request.
// Since expires is set newRequest will presign the request.
req, err := c.newRequest("PUT", requestMetadata{
presignURL: true,
bucketName: bucketName,
objectName: objectName,
expires: expireSeconds,
})
if err != nil {
return "", err
}
return req.URL.String(), nil
func (c Client) PresignedPutObject(bucketName string, objectName string, expires time.Duration) (url string, err error) {
return c.presignURL("PUT", bucketName, objectName, expires)
}
// PresignedPostPolicy - Returns POST form data to upload an object at a location.
@ -113,29 +100,38 @@ func (c Client) PresignedPostPolicy(p *PostPolicy) (map[string]string, error) {
p.formData["AWSAccessKeyId"] = c.accessKeyID
}
// Sign the policy.
p.formData["signature"] = PostPresignSignatureV2(policyBase64, c.secretAccessKey)
p.formData["signature"] = postPresignSignatureV2(policyBase64, c.secretAccessKey)
return p.formData, nil
}
// Add date policy.
p.addNewPolicy(policyCondition{
if err = p.addNewPolicy(policyCondition{
matchType: "eq",
condition: "$x-amz-date",
value: t.Format(iso8601DateFormat),
})
}); err != nil {
return nil, err
}
// Add algorithm policy.
p.addNewPolicy(policyCondition{
if err = p.addNewPolicy(policyCondition{
matchType: "eq",
condition: "$x-amz-algorithm",
value: signV4Algorithm,
})
}); err != nil {
return nil, err
}
// Add a credential policy.
credential := getCredential(c.accessKeyID, location, t)
p.addNewPolicy(policyCondition{
if err = p.addNewPolicy(policyCondition{
matchType: "eq",
condition: "$x-amz-credential",
value: credential,
})
}); err != nil {
return nil, err
}
// Get base64 encoded policy.
policyBase64 := p.base64()
// Fill in the form data.
@ -143,6 +139,6 @@ func (c Client) PresignedPostPolicy(p *PostPolicy) (map[string]string, error) {
p.formData["x-amz-algorithm"] = signV4Algorithm
p.formData["x-amz-credential"] = credential
p.formData["x-amz-date"] = t.Format(iso8601DateFormat)
p.formData["x-amz-signature"] = PostPresignSignatureV4(policyBase64, t, c.secretAccessKey, location)
p.formData["x-amz-signature"] = postPresignSignatureV4(policyBase64, t, c.secretAccessKey, location)
return p.formData, nil
}

View File

@ -42,11 +42,6 @@ import (
// For Amazon S3 for more supported regions - http://docs.aws.amazon.com/general/latest/gr/rande.html
// For Google Cloud Storage for more supported regions - https://cloud.google.com/storage/docs/bucket-locations
func (c Client) MakeBucket(bucketName string, acl BucketACL, location string) error {
// Validate if request is made on anonymous requests.
if c.anonymous {
return ErrInvalidArgument("Make bucket cannot be issued with anonymous credentials.")
}
// Validate the input arguments.
if err := isValidBucketName(bucketName); err != nil {
return err
@ -75,11 +70,11 @@ func (c Client) MakeBucket(bucketName string, acl BucketACL, location string) er
if resp != nil {
if resp.StatusCode != http.StatusOK {
return HTTPRespToErrorResponse(resp, bucketName, "")
return httpRespToErrorResponse(resp, bucketName, "")
}
}
// Save the location into cache on a succesful makeBucket response.
// Save the location into cache on a successfull makeBucket response.
c.bucketLocCache.Set(bucketName, location)
// Return.
@ -96,19 +91,14 @@ func (c Client) makeBucketRequest(bucketName string, acl BucketACL, location str
return nil, ErrInvalidArgument("Unrecognized ACL " + acl.String())
}
// Set get bucket location always as path style.
// In case of Amazon S3. The make bucket issued on already
// existing bucket would fail with 'AuthorizationMalformed' error
// if virtual style is used. So we default to 'path style' as that
// is the preferred method here. The final location of the
// 'bucket' is provided through XML LocationConstraint data with
// the request.
targetURL := *c.endpointURL
if bucketName != "" {
// If endpoint supports virtual host style use that always.
// Currently only S3 and Google Cloud Storage would support this.
if isVirtualHostSupported(c.endpointURL) {
targetURL.Host = bucketName + "." + c.endpointURL.Host
targetURL.Path = "/"
} else {
// If not fall back to using path style.
targetURL.Path = "/" + bucketName
}
}
targetURL.Path = "/" + bucketName + "/"
// get a new HTTP request for the method.
req, err := http.NewRequest("PUT", targetURL.String(), nil)
@ -151,9 +141,9 @@ func (c Client) makeBucketRequest(bucketName string, acl BucketACL, location str
if c.signature.isV4() {
// Signature calculated for MakeBucket request should be for 'us-east-1',
// regardless of the bucket's location constraint.
req = SignV4(*req, c.accessKeyID, c.secretAccessKey, "us-east-1")
req = signV4(*req, c.accessKeyID, c.secretAccessKey, "us-east-1")
} else if c.signature.isV2() {
req = SignV2(*req, c.accessKeyID, c.secretAccessKey)
req = signV2(*req, c.accessKeyID, c.secretAccessKey)
}
// Return signed request.
@ -210,7 +200,7 @@ func (c Client) SetBucketACL(bucketName string, acl BucketACL) error {
if resp != nil {
// if error return.
if resp.StatusCode != http.StatusOK {
return HTTPRespToErrorResponse(resp, bucketName, "")
return httpRespToErrorResponse(resp, bucketName, "")
}
}

View File

@ -21,6 +21,7 @@ import (
"crypto/sha256"
"hash"
"io"
"math"
"os"
)
@ -42,8 +43,93 @@ func isReadAt(reader io.Reader) (ok bool) {
return
}
// hashCopyN - Calculates Md5sum and SHA256sum for upto partSize amount of bytes.
func (c Client) hashCopyN(writer io.ReadWriteSeeker, reader io.Reader, partSize int64) (md5Sum, sha256Sum []byte, size int64, err error) {
// shouldUploadPart - verify if part should be uploaded.
func shouldUploadPart(objPart objectPart, objectParts map[int]objectPart) bool {
// If part not found should upload the part.
uploadedPart, found := objectParts[objPart.PartNumber]
if !found {
return true
}
// if size mismatches should upload the part.
if objPart.Size != uploadedPart.Size {
return true
}
// if md5sum mismatches should upload the part.
if objPart.ETag == uploadedPart.ETag {
return true
}
return false
}
// optimalPartInfo - calculate the optimal part info for a given
// object size.
//
// NOTE: Assumption here is that for any object to be uploaded to any S3 compatible
// object storage it will have the following parameters as constants.
//
// maxPartsCount - 10000
// minPartSize - 5MiB
// maxMultipartPutObjectSize - 5TiB
//
func optimalPartInfo(objectSize int64) (totalPartsCount int, partSize int64, lastPartSize int64, err error) {
// object size is '-1' set it to 5TiB.
if objectSize == -1 {
objectSize = maxMultipartPutObjectSize
}
// object size is larger than supported maximum.
if objectSize > maxMultipartPutObjectSize {
err = ErrEntityTooLarge(objectSize, maxMultipartPutObjectSize, "", "")
return
}
// Use floats for part size for all calculations to avoid
// overflows during float64 to int64 conversions.
partSizeFlt := math.Ceil(float64(objectSize / maxPartsCount))
partSizeFlt = math.Ceil(partSizeFlt/minPartSize) * minPartSize
// Total parts count.
totalPartsCount = int(math.Ceil(float64(objectSize) / partSizeFlt))
// Part size.
partSize = int64(partSizeFlt)
// Last part size.
lastPartSize = objectSize - int64(totalPartsCount-1)*partSize
return totalPartsCount, partSize, lastPartSize, nil
}
// hashCopyBuffer is identical to hashCopyN except that it stages
// through the provided buffer (if one is required) rather than
// allocating a temporary one. If buf is nil, one is allocated for 5MiB.
func (c Client) hashCopyBuffer(writer io.Writer, reader io.Reader, buf []byte) (md5Sum, sha256Sum []byte, size int64, err error) {
// MD5 and SHA256 hasher.
var hashMD5, hashSHA256 hash.Hash
// MD5 and SHA256 hasher.
hashMD5 = md5.New()
hashWriter := io.MultiWriter(writer, hashMD5)
if c.signature.isV4() {
hashSHA256 = sha256.New()
hashWriter = io.MultiWriter(writer, hashMD5, hashSHA256)
}
// Allocate buf if not initialized.
if buf == nil {
buf = make([]byte, optimalReadBufferSize)
}
// Using io.CopyBuffer to copy in large buffers, default buffer
// for io.Copy of 32KiB is too small.
size, err = io.CopyBuffer(hashWriter, reader, buf)
if err != nil {
return nil, nil, 0, err
}
// Finalize md5 sum and sha256 sum.
md5Sum = hashMD5.Sum(nil)
if c.signature.isV4() {
sha256Sum = hashSHA256.Sum(nil)
}
return md5Sum, sha256Sum, size, err
}
// hashCopyN - Calculates Md5sum and SHA256sum for up to partSize amount of bytes.
func (c Client) hashCopyN(writer io.Writer, reader io.Reader, partSize int64) (md5Sum, sha256Sum []byte, size int64, err error) {
// MD5 and SHA256 hasher.
var hashMD5, hashSHA256 hash.Hash
// MD5 and SHA256 hasher.
@ -63,11 +149,6 @@ func (c Client) hashCopyN(writer io.ReadWriteSeeker, reader io.Reader, partSize
}
}
// Seek back to beginning of input, any error fail right here.
if _, err := writer.Seek(0, 0); err != nil {
return nil, nil, 0, err
}
// Finalize md5shum and sha256 sum.
md5Sum = hashMD5.Sum(nil)
if c.signature.isV4() {
@ -111,8 +192,12 @@ func (c Client) getUploadID(bucketName, objectName, contentType string) (uploadI
return uploadID, isNew, nil
}
// computeHash - Calculates MD5 and SHA256 for an input read Seeker.
func (c Client) computeHash(reader io.ReadSeeker) (md5Sum, sha256Sum []byte, size int64, err error) {
// computeHashBuffer - Calculates MD5 and SHA256 for an input read
// Seeker is identical to computeHash except that it stages
// through the provided buffer (if one is required) rather than
// allocating a temporary one. If buf is nil, it uses a temporary
// buffer.
func (c Client) computeHashBuffer(reader io.ReadSeeker, buf []byte) (md5Sum, sha256Sum []byte, size int64, err error) {
// MD5 and SHA256 hasher.
var hashMD5, hashSHA256 hash.Hash
// MD5 and SHA256 hasher.
@ -123,9 +208,17 @@ func (c Client) computeHash(reader io.ReadSeeker) (md5Sum, sha256Sum []byte, siz
hashWriter = io.MultiWriter(hashMD5, hashSHA256)
}
size, err = io.Copy(hashWriter, reader)
if err != nil {
return nil, nil, 0, err
// If no buffer is provided, no need to allocate just use io.Copy.
if buf == nil {
size, err = io.Copy(hashWriter, reader)
if err != nil {
return nil, nil, 0, err
}
} else {
size, err = io.CopyBuffer(hashWriter, reader, buf)
if err != nil {
return nil, nil, 0, err
}
}
// Seek back reader to the beginning location.
@ -141,27 +234,7 @@ func (c Client) computeHash(reader io.ReadSeeker) (md5Sum, sha256Sum []byte, siz
return md5Sum, sha256Sum, size, nil
}
// Fetch all parts info, including total uploaded size, maximum part
// size and max part number.
func (c Client) getPartsInfo(bucketName, objectName, uploadID string) (prtsInfo map[int]objectPart, totalSize int64, maxPrtSize int64, maxPrtNumber int, err error) {
// Fetch previously upload parts.
prtsInfo, err = c.listObjectParts(bucketName, objectName, uploadID)
if err != nil {
return nil, 0, 0, 0, err
}
// Peek through all the parts and calculate totalSize, maximum
// part size and last part number.
for _, prtInfo := range prtsInfo {
// Save previously uploaded size.
totalSize += prtInfo.Size
// Choose the maximum part size.
if prtInfo.Size >= maxPrtSize {
maxPrtSize = prtInfo.Size
}
// Choose the maximum part number.
if maxPrtNumber < prtInfo.PartNumber {
maxPrtNumber = prtInfo.PartNumber
}
}
return prtsInfo, totalSize, maxPrtSize, maxPrtNumber, nil
// computeHash - Calculates MD5 and SHA256 for an input read Seeker.
func (c Client) computeHash(reader io.ReadSeeker) (md5Sum, sha256Sum []byte, size int64, err error) {
return c.computeHashBuffer(reader, nil)
}

View File

@ -54,7 +54,7 @@ func (c Client) FPutObject(bucketName, objectName, filePath, contentType string)
// Check for largest object size allowed.
if fileSize > int64(maxMultipartPutObjectSize) {
return 0, ErrEntityTooLarge(fileSize, bucketName, objectName)
return 0, ErrEntityTooLarge(fileSize, maxMultipartPutObjectSize, bucketName, objectName)
}
// NOTE: Google Cloud Storage multipart Put is not compatible with Amazon S3 APIs.
@ -68,8 +68,8 @@ func (c Client) FPutObject(bucketName, objectName, filePath, contentType string)
BucketName: bucketName,
}
}
// Do not compute MD5 for Google Cloud Storage. Uploads upto 5GiB in size.
return c.putObjectNoChecksum(bucketName, objectName, fileReader, fileSize, contentType)
// Do not compute MD5 for Google Cloud Storage. Uploads up to 5GiB in size.
return c.putObjectNoChecksum(bucketName, objectName, fileReader, fileSize, contentType, nil)
}
// NOTE: S3 doesn't allow anonymous multipart requests.
@ -82,16 +82,17 @@ func (c Client) FPutObject(bucketName, objectName, filePath, contentType string)
BucketName: bucketName,
}
}
// Do not compute MD5 for anonymous requests to Amazon S3. Uploads upto 5GiB in size.
return c.putObjectNoChecksum(bucketName, objectName, fileReader, fileSize, contentType)
// Do not compute MD5 for anonymous requests to Amazon
// S3. Uploads up to 5GiB in size.
return c.putObjectNoChecksum(bucketName, objectName, fileReader, fileSize, contentType, nil)
}
// Small object upload is initiated for uploads for input data size smaller than 5MiB.
if fileSize < minimumPartSize {
return c.putObjectSingle(bucketName, objectName, fileReader, fileSize, contentType)
if fileSize < minPartSize && fileSize >= 0 {
return c.putObjectSingle(bucketName, objectName, fileReader, fileSize, contentType, nil)
}
// Upload all large objects as multipart.
n, err = c.putObjectMultipartFromFile(bucketName, objectName, fileReader, fileSize, contentType)
n, err = c.putObjectMultipartFromFile(bucketName, objectName, fileReader, fileSize, contentType, nil)
if err != nil {
errResp := ToErrorResponse(err)
// Verify if multipart functionality is not available, if not
@ -99,10 +100,10 @@ func (c Client) FPutObject(bucketName, objectName, filePath, contentType string)
if errResp.Code == "NotImplemented" {
// If size of file is greater than '5GiB' fail.
if fileSize > maxSinglePutObjectSize {
return 0, ErrEntityTooLarge(fileSize, bucketName, objectName)
return 0, ErrEntityTooLarge(fileSize, maxSinglePutObjectSize, bucketName, objectName)
}
// Fall back to uploading as single PutObject operation.
return c.putObjectSingle(bucketName, objectName, fileReader, fileSize, contentType)
return c.putObjectSingle(bucketName, objectName, fileReader, fileSize, contentType, nil)
}
return n, err
}
@ -117,7 +118,7 @@ func (c Client) FPutObject(bucketName, objectName, filePath, contentType string)
// against MD5SUM of each individual parts. This function also
// effectively utilizes file system capabilities of reading from
// specific sections and not having to create temporary files.
func (c Client) putObjectMultipartFromFile(bucketName, objectName string, fileReader *os.File, fileSize int64, contentType string) (int64, error) {
func (c Client) putObjectMultipartFromFile(bucketName, objectName string, fileReader io.ReaderAt, fileSize int64, contentType string, progress io.Reader) (int64, error) {
// Input validation.
if err := isValidBucketName(bucketName); err != nil {
return 0, err
@ -139,9 +140,6 @@ func (c Client) putObjectMultipartFromFile(bucketName, objectName string, fileRe
// Complete multipart upload.
var completeMultipartUpload completeMultipartUpload
// Previous maximum part size
var prevMaxPartSize int64
// A map of all uploaded parts.
var partsInfo = make(map[int]objectPart)
@ -149,52 +147,67 @@ func (c Client) putObjectMultipartFromFile(bucketName, objectName string, fileRe
// previously uploaded parts info.
if !isNew {
// Fetch previously upload parts and maximum part size.
partsInfo, _, prevMaxPartSize, _, err = c.getPartsInfo(bucketName, objectName, uploadID)
partsInfo, err = c.listObjectParts(bucketName, objectName, uploadID)
if err != nil {
return 0, err
}
}
// Calculate the optimal part size for a given file size.
partSize := optimalPartSize(fileSize)
// Use prevMaxPartSize if available.
if prevMaxPartSize != 0 {
partSize = prevMaxPartSize
// Calculate the optimal parts info for a given size.
totalPartsCount, partSize, _, err := optimalPartInfo(fileSize)
if err != nil {
return 0, err
}
// Part number always starts with '0'.
partNumber := 0
// Upload each part until fileSize.
for totalUploadedSize < fileSize {
// Increment part number.
partNumber++
// Part number always starts with '1'.
partNumber := 1
for partNumber <= totalPartsCount {
// Get a section reader on a particular offset.
sectionReader := io.NewSectionReader(fileReader, totalUploadedSize, partSize)
// Calculates MD5 and SHA256 sum for a section reader.
md5Sum, sha256Sum, size, err := c.computeHash(sectionReader)
var md5Sum, sha256Sum []byte
var prtSize int64
md5Sum, sha256Sum, prtSize, err = c.computeHash(sectionReader)
if err != nil {
return 0, err
}
// Verify if part was not uploaded.
if !isPartUploaded(objectPart{
var reader io.Reader
// Update progress reader appropriately to the latest offset
// as we read from the source.
reader = newHook(sectionReader, progress)
// Verify if part should be uploaded.
if shouldUploadPart(objectPart{
ETag: hex.EncodeToString(md5Sum),
PartNumber: partNumber,
Size: prtSize,
}, partsInfo) {
// Proceed to upload the part.
objPart, err := c.uploadPart(bucketName, objectName, uploadID, ioutil.NopCloser(sectionReader), partNumber, md5Sum, sha256Sum, size)
var objPart objectPart
objPart, err = c.uploadPart(bucketName, objectName, uploadID, ioutil.NopCloser(reader), partNumber,
md5Sum, sha256Sum, prtSize)
if err != nil {
return totalUploadedSize, err
}
// Save successfully uploaded part metadata.
partsInfo[partNumber] = objPart
} else {
// Update the progress reader for the skipped part.
if progress != nil {
if _, err = io.CopyN(ioutil.Discard, progress, prtSize); err != nil {
return totalUploadedSize, err
}
}
}
// Save successfully uploaded size.
totalUploadedSize += size
totalUploadedSize += prtSize
// Increment part number.
partNumber++
}
// Verify if we uploaded all data.
@ -210,8 +223,8 @@ func (c Client) putObjectMultipartFromFile(bucketName, objectName string, fileRe
completeMultipartUpload.Parts = append(completeMultipartUpload.Parts, complPart)
}
// Verify if partNumber is different than total list of parts.
if partNumber != len(completeMultipartUpload.Parts) {
// Verify if totalPartsCount is not equal to total list of parts.
if totalPartsCount != len(completeMultipartUpload.Parts) {
return totalUploadedSize, ErrInvalidParts(partNumber, len(completeMultipartUpload.Parts))
}

View File

@ -41,11 +41,11 @@ import (
// If we exhaust all the known types, code proceeds to use stream as
// is where each part is re-downloaded, checksummed and verified
// before upload.
func (c Client) putObjectMultipart(bucketName, objectName string, reader io.Reader, size int64, contentType string) (n int64, err error) {
if size > 0 && size >= minimumPartSize {
func (c Client) putObjectMultipart(bucketName, objectName string, reader io.Reader, size int64, contentType string, progress io.Reader) (n int64, err error) {
if size > 0 && size >= minPartSize {
// Verify if reader is *os.File, then use file system functionalities.
if isFile(reader) {
return c.putObjectMultipartFromFile(bucketName, objectName, reader.(*os.File), size, contentType)
return c.putObjectMultipartFromFile(bucketName, objectName, reader.(*os.File), size, contentType, progress)
}
// Verify if reader is *minio.Object or io.ReaderAt.
// NOTE: Verification of object is kept for a specific purpose
@ -54,17 +54,17 @@ func (c Client) putObjectMultipart(bucketName, objectName string, reader io.Read
// and such a functionality is used in the subsequent code
// path.
if isObject(reader) || isReadAt(reader) {
return c.putObjectMultipartFromReadAt(bucketName, objectName, reader.(io.ReaderAt), size, contentType)
return c.putObjectMultipartFromReadAt(bucketName, objectName, reader.(io.ReaderAt), size, contentType, progress)
}
}
// For any other data size and reader type we do generic multipart
// approach by staging data in temporary files and uploading them.
return c.putObjectMultipartStream(bucketName, objectName, reader, size, contentType)
return c.putObjectMultipartStream(bucketName, objectName, reader, size, contentType, progress)
}
// putObjectStream uploads files bigger than 5MiB, and also supports
// special case where size is unknown i.e '-1'.
func (c Client) putObjectMultipartStream(bucketName, objectName string, reader io.Reader, size int64, contentType string) (n int64, err error) {
func (c Client) putObjectMultipartStream(bucketName, objectName string, reader io.Reader, size int64, contentType string, progress io.Reader) (n int64, err error) {
// Input validation.
if err := isValidBucketName(bucketName); err != nil {
return 0, err
@ -73,6 +73,15 @@ func (c Client) putObjectMultipartStream(bucketName, objectName string, reader i
return 0, err
}
// Total data read and written to server. should be equal to 'size' at the end of the call.
var totalUploadedSize int64
// Complete multipart upload.
var complMultipartUpload completeMultipartUpload
// A map of all previously uploaded parts.
var partsInfo = make(map[int]objectPart)
// getUploadID for an object, initiates a new multipart request
// if it cannot find any previously partially uploaded object.
uploadID, isNew, err := c.getUploadID(bucketName, objectName, contentType)
@ -80,83 +89,83 @@ func (c Client) putObjectMultipartStream(bucketName, objectName string, reader i
return 0, err
}
// Total data read and written to server. should be equal to 'size' at the end of the call.
var totalUploadedSize int64
// Complete multipart upload.
var completeMultipartUpload completeMultipartUpload
// Previous maximum part size
var prevMaxPartSize int64
// A map of all previously uploaded parts.
var partsInfo = make(map[int]objectPart)
// If This session is a continuation of a previous session fetch all
// previously uploaded parts info.
if !isNew {
// Fetch previously uploaded parts and maximum part size.
partsInfo, _, prevMaxPartSize, _, err = c.getPartsInfo(bucketName, objectName, uploadID)
partsInfo, err = c.listObjectParts(bucketName, objectName, uploadID)
if err != nil {
return 0, err
}
}
// Calculate the optimal part size for a given size.
partSize := optimalPartSize(size)
// Use prevMaxPartSize if available.
if prevMaxPartSize != 0 {
partSize = prevMaxPartSize
// Calculate the optimal parts info for a given size.
totalPartsCount, partSize, _, err := optimalPartInfo(size)
if err != nil {
return 0, err
}
// Part number always starts with '0'.
partNumber := 0
// Part number always starts with '1'.
partNumber := 1
// Upload each part until EOF.
for {
// Increment part number.
partNumber++
// Initialize a temporary buffer.
tmpBuffer := new(bytes.Buffer)
// Initialize a new temporary file.
tmpFile, err := newTempFile("multiparts$-putobject-stream")
if err != nil {
return 0, err
}
// Calculates MD5 and SHA256 sum while copying partSize bytes into tmpFile.
md5Sum, sha256Sum, size, rErr := c.hashCopyN(tmpFile, reader, partSize)
for partNumber <= totalPartsCount {
// Calculates MD5 and SHA256 sum while copying partSize bytes
// into tmpBuffer.
md5Sum, sha256Sum, prtSize, rErr := c.hashCopyN(tmpBuffer, reader, partSize)
if rErr != nil {
if rErr != io.EOF {
return 0, rErr
}
}
// Verify if part was not uploaded.
if !isPartUploaded(objectPart{
var reader io.Reader
// Update progress reader appropriately to the latest offset
// as we read from the source.
reader = newHook(tmpBuffer, progress)
// Verify if part should be uploaded.
if shouldUploadPart(objectPart{
ETag: hex.EncodeToString(md5Sum),
PartNumber: partNumber,
Size: prtSize,
}, partsInfo) {
// Proceed to upload the part.
objPart, err := c.uploadPart(bucketName, objectName, uploadID, tmpFile, partNumber, md5Sum, sha256Sum, size)
var objPart objectPart
objPart, err = c.uploadPart(bucketName, objectName, uploadID, ioutil.NopCloser(reader), partNumber,
md5Sum, sha256Sum, prtSize)
if err != nil {
// Close the temporary file upon any error.
tmpFile.Close()
return 0, err
// Reset the temporary buffer upon any error.
tmpBuffer.Reset()
return totalUploadedSize, err
}
// Save successfully uploaded part metadata.
partsInfo[partNumber] = objPart
} else {
// Update the progress reader for the skipped part.
if progress != nil {
if _, err = io.CopyN(ioutil.Discard, progress, prtSize); err != nil {
return totalUploadedSize, err
}
}
}
// Close the temporary file.
tmpFile.Close()
// Reset the temporary buffer.
tmpBuffer.Reset()
// Save successfully uploaded size.
totalUploadedSize += size
totalUploadedSize += prtSize
// If read error was an EOF, break out of the loop.
if rErr == io.EOF {
// For unknown size, Read EOF we break away.
// We do not have to upload till totalPartsCount.
if size < 0 && rErr == io.EOF {
break
}
// Increment part number.
partNumber++
}
// Verify if we uploaded all the data.
@ -171,17 +180,19 @@ func (c Client) putObjectMultipartStream(bucketName, objectName string, reader i
var complPart completePart
complPart.ETag = part.ETag
complPart.PartNumber = part.PartNumber
completeMultipartUpload.Parts = append(completeMultipartUpload.Parts, complPart)
complMultipartUpload.Parts = append(complMultipartUpload.Parts, complPart)
}
// Verify if partNumber is different than total list of parts.
if partNumber != len(completeMultipartUpload.Parts) {
return totalUploadedSize, ErrInvalidParts(partNumber, len(completeMultipartUpload.Parts))
if size > 0 {
// Verify if totalPartsCount is not equal to total list of parts.
if totalPartsCount != len(complMultipartUpload.Parts) {
return totalUploadedSize, ErrInvalidParts(partNumber, len(complMultipartUpload.Parts))
}
}
// Sort all completed parts.
sort.Sort(completedParts(completeMultipartUpload.Parts))
_, err = c.completeMultipartUpload(bucketName, objectName, uploadID, completeMultipartUpload)
sort.Sort(completedParts(complMultipartUpload.Parts))
_, err = c.completeMultipartUpload(bucketName, objectName, uploadID, complMultipartUpload)
if err != nil {
return totalUploadedSize, err
}
@ -233,7 +244,7 @@ func (c Client) initiateMultipartUpload(bucketName, objectName, contentType stri
}
if resp != nil {
if resp.StatusCode != http.StatusOK {
return initiateMultipartUploadResult{}, HTTPRespToErrorResponse(resp, bucketName, objectName)
return initiateMultipartUploadResult{}, httpRespToErrorResponse(resp, bucketName, objectName)
}
}
// Decode xml for new multipart upload.
@ -255,7 +266,7 @@ func (c Client) uploadPart(bucketName, objectName, uploadID string, reader io.Re
return objectPart{}, err
}
if size > maxPartSize {
return objectPart{}, ErrEntityTooLarge(size, bucketName, objectName)
return objectPart{}, ErrEntityTooLarge(size, maxPartSize, bucketName, objectName)
}
if size <= -1 {
return objectPart{}, ErrEntityTooSmall(size, bucketName, objectName)
@ -297,7 +308,7 @@ func (c Client) uploadPart(bucketName, objectName, uploadID string, reader io.Re
}
if resp != nil {
if resp.StatusCode != http.StatusOK {
return objectPart{}, HTTPRespToErrorResponse(resp, bucketName, objectName)
return objectPart{}, httpRespToErrorResponse(resp, bucketName, objectName)
}
}
// Once successfully uploaded, return completed part.
@ -355,7 +366,7 @@ func (c Client) completeMultipartUpload(bucketName, objectName, uploadID string,
}
if resp != nil {
if resp.StatusCode != http.StatusOK {
return completeMultipartUploadResult{}, HTTPRespToErrorResponse(resp, bucketName, objectName)
return completeMultipartUploadResult{}, httpRespToErrorResponse(resp, bucketName, objectName)
}
}
// Decode completed multipart upload response on success.

View File

@ -0,0 +1,105 @@
/*
* Minio Go Library for Amazon S3 Compatible Cloud Storage (C) 2015 Minio, Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package minio
import "io"
// PutObjectWithProgress - With progress.
func (c Client) PutObjectWithProgress(bucketName, objectName string, reader io.Reader, contentType string, progress io.Reader) (n int64, err error) {
// Input validation.
if err := isValidBucketName(bucketName); err != nil {
return 0, err
}
if err := isValidObjectName(objectName); err != nil {
return 0, err
}
if reader == nil {
return 0, ErrInvalidArgument("Input reader is invalid, cannot be nil.")
}
// Size of the object.
var size int64
// Get reader size.
size, err = getReaderSize(reader)
if err != nil {
return 0, err
}
// Check for largest object size allowed.
if size > int64(maxMultipartPutObjectSize) {
return 0, ErrEntityTooLarge(size, maxMultipartPutObjectSize, bucketName, objectName)
}
// NOTE: Google Cloud Storage does not implement Amazon S3 Compatible multipart PUT.
// So we fall back to single PUT operation with the maximum limit of 5GiB.
if isGoogleEndpoint(c.endpointURL) {
if size <= -1 {
return 0, ErrorResponse{
Code: "NotImplemented",
Message: "Content-Length cannot be negative for file uploads to Google Cloud Storage.",
Key: objectName,
BucketName: bucketName,
}
}
if size > maxSinglePutObjectSize {
return 0, ErrEntityTooLarge(size, maxSinglePutObjectSize, bucketName, objectName)
}
// Do not compute MD5 for Google Cloud Storage. Uploads up to 5GiB in size.
return c.putObjectNoChecksum(bucketName, objectName, reader, size, contentType, progress)
}
// NOTE: S3 doesn't allow anonymous multipart requests.
if isAmazonEndpoint(c.endpointURL) && c.anonymous {
if size <= -1 {
return 0, ErrorResponse{
Code: "NotImplemented",
Message: "Content-Length cannot be negative for anonymous requests.",
Key: objectName,
BucketName: bucketName,
}
}
if size > maxSinglePutObjectSize {
return 0, ErrEntityTooLarge(size, maxSinglePutObjectSize, bucketName, objectName)
}
// Do not compute MD5 for anonymous requests to Amazon
// S3. Uploads up to 5GiB in size.
return c.putObjectNoChecksum(bucketName, objectName, reader, size, contentType, progress)
}
// putSmall object.
if size < minPartSize && size >= 0 {
return c.putObjectSingle(bucketName, objectName, reader, size, contentType, progress)
}
// For all sizes greater than 5MiB do multipart.
n, err = c.putObjectMultipart(bucketName, objectName, reader, size, contentType, progress)
if err != nil {
errResp := ToErrorResponse(err)
// Verify if multipart functionality is not available, if not
// fall back to single PutObject operation.
if errResp.Code == "NotImplemented" {
// Verify if size of reader is greater than '5GiB'.
if size > maxSinglePutObjectSize {
return 0, ErrEntityTooLarge(size, maxSinglePutObjectSize, bucketName, objectName)
}
// Fall back to uploading as single PutObject operation.
return c.putObjectSingle(bucketName, objectName, reader, size, contentType, progress)
}
return n, err
}
return n, nil
}

View File

@ -17,14 +17,26 @@
package minio
import (
"crypto/md5"
"crypto/sha256"
"errors"
"hash"
"bytes"
"io"
"io/ioutil"
"sort"
)
// shouldUploadPartReadAt - verify if part should be uploaded.
func shouldUploadPartReadAt(objPart objectPart, objectParts map[int]objectPart) bool {
// If part not found part should be uploaded.
uploadedPart, found := objectParts[objPart.PartNumber]
if !found {
return true
}
// if size mismatches part should be uploaded.
if uploadedPart.Size != objPart.Size {
return true
}
return false
}
// putObjectMultipartFromReadAt - Uploads files bigger than 5MiB. Supports reader
// of type which implements io.ReaderAt interface (ReadAt method).
//
@ -35,7 +47,7 @@ import (
// temporary files for staging all the data, these temporary files are
// cleaned automatically when the caller i.e http client closes the
// stream after uploading all the contents successfully.
func (c Client) putObjectMultipartFromReadAt(bucketName, objectName string, reader io.ReaderAt, size int64, contentType string) (n int64, err error) {
func (c Client) putObjectMultipartFromReadAt(bucketName, objectName string, reader io.ReaderAt, size int64, contentType string, progress io.Reader) (n int64, err error) {
// Input validation.
if err := isValidBucketName(bucketName); err != nil {
return 0, err
@ -55,121 +67,114 @@ func (c Client) putObjectMultipartFromReadAt(bucketName, objectName string, read
var totalUploadedSize int64
// Complete multipart upload.
var completeMultipartUpload completeMultipartUpload
// Previous maximum part size
var prevMaxPartSize int64
// Previous part number.
var prevPartNumber int
var complMultipartUpload completeMultipartUpload
// A map of all uploaded parts.
var partsInfo = make(map[int]objectPart)
// Fetch all parts info previously uploaded.
if !isNew {
partsInfo, totalUploadedSize, prevMaxPartSize, prevPartNumber, err = c.getPartsInfo(bucketName, objectName, uploadID)
partsInfo, err = c.listObjectParts(bucketName, objectName, uploadID)
if err != nil {
return 0, err
}
}
// Calculate the optimal part size for a given file size.
partSize := optimalPartSize(size)
// If prevMaxPartSize is set use that.
if prevMaxPartSize != 0 {
partSize = prevMaxPartSize
// Calculate the optimal parts info for a given size.
totalPartsCount, partSize, lastPartSize, err := optimalPartInfo(size)
if err != nil {
return 0, err
}
// MD5 and SHA256 hasher.
var hashMD5, hashSHA256 hash.Hash
// Used for readability, lastPartNumber is always
// totalPartsCount.
lastPartNumber := totalPartsCount
// Part number always starts with prevPartNumber + 1. i.e The next part number.
partNumber := prevPartNumber + 1
// partNumber always starts with '1'.
partNumber := 1
// Upload each part until totalUploadedSize reaches input reader size.
for totalUploadedSize < size {
// Initialize a new temporary file.
tmpFile, err := newTempFile("multiparts$-putobject-partial")
if err != nil {
return 0, err
// Initialize a temporary buffer.
tmpBuffer := new(bytes.Buffer)
// Read defaults to reading at 5MiB buffer.
readBuffer := make([]byte, optimalReadBufferSize)
// Upload all the missing parts.
for partNumber <= lastPartNumber {
// Verify object if its uploaded.
verifyObjPart := objectPart{
PartNumber: partNumber,
Size: partSize,
}
// Special case if we see a last part number, save last part
// size as the proper part size.
if partNumber == lastPartNumber {
verifyObjPart = objectPart{
PartNumber: lastPartNumber,
Size: lastPartSize,
}
}
// Create a hash multiwriter.
hashMD5 = md5.New()
hashWriter := io.MultiWriter(hashMD5)
if c.signature.isV4() {
hashSHA256 = sha256.New()
hashWriter = io.MultiWriter(hashMD5, hashSHA256)
}
writer := io.MultiWriter(tmpFile, hashWriter)
// Choose totalUploadedSize as the current readAtOffset.
readAtOffset := totalUploadedSize
// Read until partSize.
var totalReadPartSize int64
// ReadAt defaults to reading at 5MiB buffer.
readAtBuffer := make([]byte, optimalReadAtBufferSize)
// Following block reads data at an offset from the input
// reader and copies data to into local temporary file.
// Temporary file data is limited to the partSize.
for totalReadPartSize < partSize {
readAtSize, rerr := reader.ReadAt(readAtBuffer, readAtOffset)
if rerr != nil {
if rerr != io.EOF {
return 0, rerr
// Verify if part should be uploaded.
if !shouldUploadPartReadAt(verifyObjPart, partsInfo) {
// Increment part number when not uploaded.
partNumber++
if progress != nil {
// Update the progress reader for the skipped part.
if _, err = io.CopyN(ioutil.Discard, progress, verifyObjPart.Size); err != nil {
return 0, err
}
}
writeSize, werr := writer.Write(readAtBuffer[:readAtSize])
if werr != nil {
return 0, werr
}
if readAtSize != writeSize {
return 0, errors.New("Something really bad happened here. " + reportIssue)
}
readAtOffset += int64(writeSize)
totalReadPartSize += int64(writeSize)
if rerr == io.EOF {
break
}
continue
}
// Seek back to beginning of the temporary file.
if _, err := tmpFile.Seek(0, 0); err != nil {
// If partNumber was not uploaded we calculate the missing
// part offset and size. For all other part numbers we
// calculate offset based on multiples of partSize.
readOffset := int64(partNumber-1) * partSize
missingPartSize := partSize
// As a special case if partNumber is lastPartNumber, we
// calculate the offset based on the last part size.
if partNumber == lastPartNumber {
readOffset = (size - lastPartSize)
missingPartSize = lastPartSize
}
// Get a section reader on a particular offset.
sectionReader := io.NewSectionReader(reader, readOffset, missingPartSize)
// Calculates MD5 and SHA256 sum for a section reader.
var md5Sum, sha256Sum []byte
var prtSize int64
md5Sum, sha256Sum, prtSize, err = c.hashCopyBuffer(tmpBuffer, sectionReader, readBuffer)
if err != nil {
return 0, err
}
var md5Sum, sha256Sum []byte
md5Sum = hashMD5.Sum(nil)
// Signature version '4'.
if c.signature.isV4() {
sha256Sum = hashSHA256.Sum(nil)
}
var reader io.Reader
// Update progress reader appropriately to the latest offset
// as we read from the source.
reader = newHook(tmpBuffer, progress)
// Proceed to upload the part.
objPart, err := c.uploadPart(bucketName, objectName, uploadID, tmpFile, partNumber, md5Sum, sha256Sum, totalReadPartSize)
var objPart objectPart
objPart, err = c.uploadPart(bucketName, objectName, uploadID, ioutil.NopCloser(reader),
partNumber, md5Sum, sha256Sum, prtSize)
if err != nil {
// Close the read closer.
tmpFile.Close()
return totalUploadedSize, err
// Reset the buffer upon any error.
tmpBuffer.Reset()
return 0, err
}
// Save successfully uploaded size.
totalUploadedSize += totalReadPartSize
// Save successfully uploaded part metadata.
partsInfo[partNumber] = objPart
// Move to next part.
// Increment part number here after successful part upload.
partNumber++
}
// Verify if we uploaded all the data.
if totalUploadedSize != size {
return totalUploadedSize, ErrUnexpectedEOF(totalUploadedSize, size, bucketName, objectName)
// Reset the buffer.
tmpBuffer.Reset()
}
// Loop over uploaded parts to save them in a Parts array before completing the multipart request.
@ -177,12 +182,23 @@ func (c Client) putObjectMultipartFromReadAt(bucketName, objectName string, read
var complPart completePart
complPart.ETag = part.ETag
complPart.PartNumber = part.PartNumber
completeMultipartUpload.Parts = append(completeMultipartUpload.Parts, complPart)
totalUploadedSize += part.Size
complMultipartUpload.Parts = append(complMultipartUpload.Parts, complPart)
}
// Verify if we uploaded all the data.
if totalUploadedSize != size {
return totalUploadedSize, ErrUnexpectedEOF(totalUploadedSize, size, bucketName, objectName)
}
// Verify if totalPartsCount is not equal to total list of parts.
if totalPartsCount != len(complMultipartUpload.Parts) {
return totalUploadedSize, ErrInvalidParts(totalPartsCount, len(complMultipartUpload.Parts))
}
// Sort all completed parts.
sort.Sort(completedParts(completeMultipartUpload.Parts))
_, err = c.completeMultipartUpload(bucketName, objectName, uploadID, completeMultipartUpload)
sort.Sort(completedParts(complMultipartUpload.Parts))
_, err = c.completeMultipartUpload(bucketName, objectName, uploadID, complMultipartUpload)
if err != nil {
return totalUploadedSize, err
}

View File

@ -22,37 +22,87 @@ import (
"io/ioutil"
"net/http"
"os"
"reflect"
"runtime"
"strings"
)
// getReaderSize gets the size of the underlying reader, if possible.
// getReaderSize - Determine the size of Reader if available.
func getReaderSize(reader io.Reader) (size int64, err error) {
var result []reflect.Value
size = -1
if reader != nil {
switch v := reader.(type) {
case *bytes.Buffer:
size = int64(v.Len())
case *bytes.Reader:
size = int64(v.Len())
case *strings.Reader:
size = int64(v.Len())
case *os.File:
var st os.FileInfo
st, err = v.Stat()
if err != nil {
return 0, err
// Verify if there is a method by name 'Size'.
lenFn := reflect.ValueOf(reader).MethodByName("Size")
if lenFn.IsValid() {
if lenFn.Kind() == reflect.Func {
// Call the 'Size' function and save its return value.
result = lenFn.Call([]reflect.Value{})
if result != nil && len(result) == 1 {
lenValue := result[0]
if lenValue.IsValid() {
switch lenValue.Kind() {
case reflect.Int:
fallthrough
case reflect.Int8:
fallthrough
case reflect.Int16:
fallthrough
case reflect.Int32:
fallthrough
case reflect.Int64:
size = lenValue.Int()
}
}
}
}
size = st.Size()
case *Object:
var st ObjectInfo
st, err = v.Stat()
if err != nil {
return 0, err
} else {
// Fallback to Stat() method, two possible Stat() structs
// exist.
switch v := reader.(type) {
case *os.File:
var st os.FileInfo
st, err = v.Stat()
if err != nil {
// Handle this case specially for "windows",
// certain files for example 'Stdin', 'Stdout' and
// 'Stderr' it is not allowed to fetch file information.
if runtime.GOOS == "windows" {
if strings.Contains(err.Error(), "GetFileInformationByHandle") {
return -1, nil
}
}
return
}
// Ignore if input is a directory, throw an error.
if st.Mode().IsDir() {
return -1, ErrInvalidArgument("Input file cannot be a directory.")
}
// Ignore 'Stdin', 'Stdout' and 'Stderr', since they
// represent *os.File type but internally do not
// implement Seekable calls. Ignore them and treat
// them like a stream with unknown length.
switch st.Name() {
case "stdin":
fallthrough
case "stdout":
fallthrough
case "stderr":
return
}
size = st.Size()
case *Object:
var st ObjectInfo
st, err = v.Stat()
if err != nil {
return
}
size = st.Size
}
size = st.Size
}
}
return size, nil
// Returns the size here.
return size, err
}
// completedParts is a collection of parts sortable by their part numbers.
@ -77,86 +127,12 @@ func (a completedParts) Less(i, j int) bool { return a[i].PartNumber < a[j].Part
//
// NOTE: For anonymous requests Amazon S3 doesn't allow multipart upload. So we fall back to single PUT operation.
func (c Client) PutObject(bucketName, objectName string, reader io.Reader, contentType string) (n int64, err error) {
// Input validation.
if err := isValidBucketName(bucketName); err != nil {
return 0, err
}
if err := isValidObjectName(objectName); err != nil {
return 0, err
}
// get reader size.
size, err := getReaderSize(reader)
if err != nil {
return 0, err
}
// Check for largest object size allowed.
if size > int64(maxMultipartPutObjectSize) {
return 0, ErrEntityTooLarge(size, bucketName, objectName)
}
// NOTE: Google Cloud Storage does not implement Amazon S3 Compatible multipart PUT.
// So we fall back to single PUT operation with the maximum limit of 5GiB.
if isGoogleEndpoint(c.endpointURL) {
if size <= -1 {
return 0, ErrorResponse{
Code: "NotImplemented",
Message: "Content-Length cannot be negative for file uploads to Google Cloud Storage.",
Key: objectName,
BucketName: bucketName,
}
}
if size > maxSinglePutObjectSize {
return 0, ErrEntityTooLarge(size, bucketName, objectName)
}
// Do not compute MD5 for Google Cloud Storage. Uploads upto 5GiB in size.
return c.putObjectNoChecksum(bucketName, objectName, reader, size, contentType)
}
// NOTE: S3 doesn't allow anonymous multipart requests.
if isAmazonEndpoint(c.endpointURL) && c.anonymous {
if size <= -1 {
return 0, ErrorResponse{
Code: "NotImplemented",
Message: "Content-Length cannot be negative for anonymous requests.",
Key: objectName,
BucketName: bucketName,
}
}
if size > maxSinglePutObjectSize {
return 0, ErrEntityTooLarge(size, bucketName, objectName)
}
// Do not compute MD5 for anonymous requests to Amazon S3. Uploads upto 5GiB in size.
return c.putObjectNoChecksum(bucketName, objectName, reader, size, contentType)
}
// putSmall object.
if size < minimumPartSize && size > 0 {
return c.putObjectSingle(bucketName, objectName, reader, size, contentType)
}
// For all sizes greater than 5MiB do multipart.
n, err = c.putObjectMultipart(bucketName, objectName, reader, size, contentType)
if err != nil {
errResp := ToErrorResponse(err)
// Verify if multipart functionality is not available, if not
// fall back to single PutObject operation.
if errResp.Code == "NotImplemented" {
// Verify if size of reader is greater than '5GiB'.
if size > maxSinglePutObjectSize {
return 0, ErrEntityTooLarge(size, bucketName, objectName)
}
// Fall back to uploading as single PutObject operation.
return c.putObjectSingle(bucketName, objectName, reader, size, contentType)
}
return n, err
}
return n, nil
return c.PutObjectWithProgress(bucketName, objectName, reader, contentType, nil)
}
// putObjectNoChecksum special function used Google Cloud Storage. This special function
// is used for Google Cloud Storage since Google's multipart API is not S3 compatible.
func (c Client) putObjectNoChecksum(bucketName, objectName string, reader io.Reader, size int64, contentType string) (n int64, err error) {
func (c Client) putObjectNoChecksum(bucketName, objectName string, reader io.Reader, size int64, contentType string, progress io.Reader) (n int64, err error) {
// Input validation.
if err := isValidBucketName(bucketName); err != nil {
return 0, err
@ -165,8 +141,13 @@ func (c Client) putObjectNoChecksum(bucketName, objectName string, reader io.Rea
return 0, err
}
if size > maxSinglePutObjectSize {
return 0, ErrEntityTooLarge(size, bucketName, objectName)
return 0, ErrEntityTooLarge(size, maxSinglePutObjectSize, bucketName, objectName)
}
// Update progress reader appropriately to the latest offset as we
// read from the source.
reader = newHook(reader, progress)
// This function does not calculate sha256 and md5sum for payload.
// Execute put object.
st, err := c.putObjectDo(bucketName, objectName, ioutil.NopCloser(reader), nil, nil, size, contentType)
@ -181,7 +162,7 @@ func (c Client) putObjectNoChecksum(bucketName, objectName string, reader io.Rea
// putObjectSingle is a special function for uploading single put object request.
// This special function is used as a fallback when multipart upload fails.
func (c Client) putObjectSingle(bucketName, objectName string, reader io.Reader, size int64, contentType string) (n int64, err error) {
func (c Client) putObjectSingle(bucketName, objectName string, reader io.Reader, size int64, contentType string, progress io.Reader) (n int64, err error) {
// Input validation.
if err := isValidBucketName(bucketName); err != nil {
return 0, err
@ -190,25 +171,47 @@ func (c Client) putObjectSingle(bucketName, objectName string, reader io.Reader,
return 0, err
}
if size > maxSinglePutObjectSize {
return 0, ErrEntityTooLarge(size, bucketName, objectName)
return 0, ErrEntityTooLarge(size, maxSinglePutObjectSize, bucketName, objectName)
}
// If size is a stream, upload upto 5GiB.
// If size is a stream, upload up to 5GiB.
if size <= -1 {
size = maxSinglePutObjectSize
}
// Initialize a new temporary file.
tmpFile, err := newTempFile("single$-putobject-single")
if err != nil {
return 0, err
var md5Sum, sha256Sum []byte
var readCloser io.ReadCloser
if size <= minPartSize {
// Initialize a new temporary buffer.
tmpBuffer := new(bytes.Buffer)
md5Sum, sha256Sum, size, err = c.hashCopyN(tmpBuffer, reader, size)
readCloser = ioutil.NopCloser(tmpBuffer)
} else {
// Initialize a new temporary file.
var tmpFile *tempFile
tmpFile, err = newTempFile("single$-putobject-single")
if err != nil {
return 0, err
}
md5Sum, sha256Sum, size, err = c.hashCopyN(tmpFile, reader, size)
// Seek back to beginning of the temporary file.
if _, err = tmpFile.Seek(0, 0); err != nil {
return 0, err
}
readCloser = tmpFile
}
md5Sum, sha256Sum, size, err := c.hashCopyN(tmpFile, reader, size)
// Return error if its not io.EOF.
if err != nil {
if err != io.EOF {
return 0, err
}
}
// Progress the reader to the size.
if progress != nil {
if _, err = io.CopyN(ioutil.Discard, progress, size); err != nil {
return size, err
}
}
// Execute put object.
st, err := c.putObjectDo(bucketName, objectName, tmpFile, md5Sum, sha256Sum, size, contentType)
st, err := c.putObjectDo(bucketName, objectName, readCloser, md5Sum, sha256Sum, size, contentType)
if err != nil {
return 0, err
}
@ -234,7 +237,7 @@ func (c Client) putObjectDo(bucketName, objectName string, reader io.ReadCloser,
}
if size > maxSinglePutObjectSize {
return ObjectInfo{}, ErrEntityTooLarge(size, bucketName, objectName)
return ObjectInfo{}, ErrEntityTooLarge(size, maxSinglePutObjectSize, bucketName, objectName)
}
if strings.TrimSpace(contentType) == "" {
@ -268,7 +271,7 @@ func (c Client) putObjectDo(bucketName, objectName string, reader io.ReadCloser,
}
if resp != nil {
if resp.StatusCode != http.StatusOK {
return ObjectInfo{}, HTTPRespToErrorResponse(resp, bucketName, objectName)
return ObjectInfo{}, httpRespToErrorResponse(resp, bucketName, objectName)
}
}

View File

@ -45,7 +45,7 @@ func (c Client) RemoveBucket(bucketName string) error {
}
if resp != nil {
if resp.StatusCode != http.StatusNoContent {
return HTTPRespToErrorResponse(resp, bucketName, "")
return httpRespToErrorResponse(resp, bucketName, "")
}
}
@ -158,7 +158,7 @@ func (c Client) abortMultipartUpload(bucketName, objectName, uploadID string) er
AmzBucketRegion: resp.Header.Get("x-amz-bucket-region"),
}
default:
return HTTPRespToErrorResponse(resp, bucketName, objectName)
return httpRespToErrorResponse(resp, bucketName, objectName)
}
return errorResponse
}

View File

@ -44,7 +44,7 @@ func (c Client) BucketExists(bucketName string) error {
}
if resp != nil {
if resp.StatusCode != http.StatusOK {
return HTTPRespToErrorResponse(resp, bucketName, "")
return httpRespToErrorResponse(resp, bucketName, "")
}
}
return nil
@ -75,7 +75,7 @@ func (c Client) StatObject(bucketName, objectName string) (ObjectInfo, error) {
}
if resp != nil {
if resp.StatusCode != http.StatusOK {
return ObjectInfo{}, HTTPRespToErrorResponse(resp, bucketName, objectName)
return ObjectInfo{}, httpRespToErrorResponse(resp, bucketName, objectName)
}
}

View File

@ -17,6 +17,7 @@
package minio
import (
"bytes"
"encoding/base64"
"encoding/hex"
"fmt"
@ -76,7 +77,7 @@ const (
)
// NewV2 - instantiate minio client with Amazon S3 signature version
// '2' compatiblity.
// '2' compatibility.
func NewV2(endpoint string, accessKeyID, secretAccessKey string, insecure bool) (CloudStorageClient, error) {
clnt, err := privateNew(endpoint, accessKeyID, secretAccessKey, insecure)
if err != nil {
@ -180,7 +181,7 @@ func (c *Client) SetCustomTransport(customHTTPTransport http.RoundTripper) {
}
// TraceOn - enable HTTP tracing.
func (c *Client) TraceOn(outputStream io.Writer) error {
func (c *Client) TraceOn(outputStream io.Writer) {
// if outputStream is nil then default to os.Stdout.
if outputStream == nil {
outputStream = os.Stdout
@ -190,7 +191,6 @@ func (c *Client) TraceOn(outputStream io.Writer) error {
// Enable tracing.
c.isTraceEnabled = true
return nil
}
// TraceOff - disable HTTP tracing.
@ -213,6 +213,7 @@ type requestMetadata struct {
expires int64
// Generated by our internal code.
bucketLocation string
contentBody io.ReadCloser
contentLength int64
contentSHA256Bytes []byte
@ -221,7 +222,7 @@ type requestMetadata struct {
// Filter out signature value from Authorization header.
func (c Client) filterSignature(req *http.Request) {
// For anonymous requests return here.
// For anonymous requests, no need to filter.
if c.anonymous {
return
}
@ -285,9 +286,22 @@ func (c Client) dumpHTTP(req *http.Request, resp *http.Response) error {
return err
}
} else {
respTrace, err = httputil.DumpResponse(resp, false)
if err != nil {
return err
// WORKAROUND for https://github.com/golang/go/issues/13942.
// httputil.DumpResponse does not print response headers for
// all successful calls which have response ContentLength set
// to zero. Keep this workaround until the above bug is fixed.
if resp.ContentLength == 0 {
var buffer bytes.Buffer
if err := resp.Header.Write(&buffer); err != nil {
return err
}
respTrace = buffer.Bytes()
respTrace = append(respTrace, []byte("\r\n")...)
} else {
respTrace, err = httputil.DumpResponse(resp, false)
if err != nil {
return err
}
}
}
// Write response to trace output.
@ -324,24 +338,12 @@ func (c Client) do(req *http.Request) (*http.Response, error) {
}
// newRequest - instantiate a new HTTP request for a given method.
func (c Client) newRequest(method string, metadata requestMetadata) (*http.Request, error) {
func (c Client) newRequest(method string, metadata requestMetadata) (req *http.Request, err error) {
// If no method is supplied default to 'POST'.
if method == "" {
method = "POST"
}
// construct a new target URL.
targetURL, err := c.makeTargetURL(metadata.bucketName, metadata.objectName, metadata.queryValues)
if err != nil {
return nil, err
}
// get a new HTTP request for the method.
req, err := http.NewRequest(method, targetURL.String(), nil)
if err != nil {
return nil, err
}
// Gather location only if bucketName is present.
location := "us-east-1" // Default all other requests to "us-east-1".
if metadata.bucketName != "" {
@ -351,17 +353,32 @@ func (c Client) newRequest(method string, metadata requestMetadata) (*http.Reque
}
}
// If presigned request, return quickly.
if metadata.expires != 0 {
// Save location.
metadata.bucketLocation = location
// Construct a new target URL.
targetURL, err := c.makeTargetURL(metadata.bucketName, metadata.objectName, metadata.bucketLocation, metadata.queryValues)
if err != nil {
return nil, err
}
// Initialize a new HTTP request for the method.
req, err = http.NewRequest(method, targetURL.String(), nil)
if err != nil {
return nil, err
}
// Generate presign url if needed, return right here.
if metadata.expires != 0 && metadata.presignURL {
if c.anonymous {
return nil, ErrInvalidArgument("Requests cannot be presigned with anonymous credentials.")
}
if c.signature.isV2() {
// Presign URL with signature v2.
req = PreSignV2(*req, c.accessKeyID, c.secretAccessKey, metadata.expires)
req = preSignV2(*req, c.accessKeyID, c.secretAccessKey, metadata.expires)
} else {
// Presign URL with signature v4.
req = PreSignV4(*req, c.accessKeyID, c.secretAccessKey, location, metadata.expires)
req = preSignV4(*req, c.accessKeyID, c.secretAccessKey, location, metadata.expires)
}
return req, nil
}
@ -401,17 +418,18 @@ func (c Client) newRequest(method string, metadata requestMetadata) (*http.Reque
req.Header.Set("Content-MD5", base64.StdEncoding.EncodeToString(metadata.contentMD5Bytes))
}
// Sign the request if not anonymous.
// Sign the request for all authenticated requests.
if !c.anonymous {
if c.signature.isV2() {
// Add signature version '2' authorization header.
req = SignV2(*req, c.accessKeyID, c.secretAccessKey)
req = signV2(*req, c.accessKeyID, c.secretAccessKey)
} else if c.signature.isV4() {
// Add signature version '4' authorization header.
req = SignV4(*req, c.accessKeyID, c.secretAccessKey, location)
req = signV4(*req, c.accessKeyID, c.secretAccessKey, location)
}
}
// return request.
// Return request.
return req, nil
}
@ -424,24 +442,37 @@ func (c Client) setUserAgent(req *http.Request) {
}
// makeTargetURL make a new target url.
func (c Client) makeTargetURL(bucketName, objectName string, queryValues url.Values) (*url.URL, error) {
urlStr := c.endpointURL.Scheme + "://" + c.endpointURL.Host + "/"
func (c Client) makeTargetURL(bucketName, objectName, bucketLocation string, queryValues url.Values) (*url.URL, error) {
// Save host.
host := c.endpointURL.Host
// For Amazon S3 endpoint, try to fetch location based endpoint.
if isAmazonEndpoint(c.endpointURL) {
// Fetch new host based on the bucket location.
host = getS3Endpoint(bucketLocation)
}
// Save scheme.
scheme := c.endpointURL.Scheme
urlStr := scheme + "://" + host + "/"
// Make URL only if bucketName is available, otherwise use the
// endpoint URL.
if bucketName != "" {
// Save if target url will have buckets which suppport virtual host.
isVirtualHostStyle := isVirtualHostSupported(c.endpointURL, bucketName)
// If endpoint supports virtual host style use that always.
// Currently only S3 and Google Cloud Storage would support
// this.
if isVirtualHostSupported(c.endpointURL) {
urlStr = c.endpointURL.Scheme + "://" + bucketName + "." + c.endpointURL.Host + "/"
// virtual host style.
if isVirtualHostStyle {
urlStr = scheme + "://" + bucketName + "." + host + "/"
if objectName != "" {
urlStr = urlStr + urlEncodePath(objectName)
}
} else {
// If not fall back to using path style.
urlStr = urlStr + bucketName
urlStr = urlStr + bucketName + "/"
if objectName != "" {
urlStr = urlStr + "/" + urlEncodePath(objectName)
urlStr = urlStr + urlEncodePath(objectName)
}
}
}
@ -481,6 +512,9 @@ type CloudStorageClient interface {
FPutObject(bucketName, objectName, filePath, contentType string) (n int64, err error)
FGetObject(bucketName, objectName, filePath string) error
// PutObjectWithProgress for progress.
PutObjectWithProgress(bucketName, objectName string, reader io.Reader, contentType string, progress io.Reader) (n int64, err error)
// Presigned operations.
PresignedGetObject(bucketName, objectName string, expires time.Duration) (presignedURL string, err error)
PresignedPutObject(bucketName, objectName string, expires time.Duration) (presignedURL string, err error)
@ -493,6 +527,6 @@ type CloudStorageClient interface {
SetCustomTransport(customTransport http.RoundTripper)
// HTTP tracing methods.
TraceOn(traceOutput io.Writer) error
TraceOn(traceOutput io.Writer)
TraceOff()
}

View File

@ -1,835 +0,0 @@
/*
* Minio Go Library for Amazon S3 Compatible Cloud Storage (C) 2015 Minio, Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package minio_test
import (
"bytes"
crand "crypto/rand"
"errors"
"io"
"io/ioutil"
"math/rand"
"net/http"
"os"
"testing"
"time"
"github.com/minio/minio-go"
)
func TestGetObjectClosedTwiceV2(t *testing.T) {
if testing.Short() {
t.Skip("skipping functional tests for short runs")
}
// Seed random based on current time.
rand.Seed(time.Now().Unix())
// Connect and make sure bucket exists.
c, err := minio.New(
"s3.amazonaws.com",
os.Getenv("ACCESS_KEY"),
os.Getenv("SECRET_KEY"),
false,
)
if err != nil {
t.Fatal("Error:", err)
}
// Enable tracing, write to stderr.
// c.TraceOn(os.Stderr)
// Set user agent.
c.SetAppInfo("Minio-go-FunctionalTest", "0.1.0")
// Generate a new random bucket name.
bucketName := randString(60, rand.NewSource(time.Now().UnixNano()))
// Make a new bucket.
err = c.MakeBucket(bucketName, "private", "us-east-1")
if err != nil {
t.Fatal("Error:", err, bucketName)
}
// Generate data more than 32K
buf := make([]byte, rand.Intn(1<<20)+32*1024)
_, err = io.ReadFull(crand.Reader, buf)
if err != nil {
t.Fatal("Error:", err)
}
// Save the data
objectName := randString(60, rand.NewSource(time.Now().UnixNano()))
n, err := c.PutObject(bucketName, objectName, bytes.NewReader(buf), "binary/octet-stream")
if err != nil {
t.Fatal("Error:", err, bucketName, objectName)
}
if n != int64(len(buf)) {
t.Fatalf("Error: number of bytes does not match, want %v, got %v\n", len(buf), n)
}
// Read the data back
r, err := c.GetObject(bucketName, objectName)
if err != nil {
t.Fatal("Error:", err, bucketName, objectName)
}
st, err := r.Stat()
if err != nil {
t.Fatal("Error:", err, bucketName, objectName)
}
if st.Size != int64(len(buf)) {
t.Fatalf("Error: number of bytes in stat does not match, want %v, got %v\n",
len(buf), st.Size)
}
if err := r.Close(); err != nil {
t.Fatal("Error:", err)
}
if err := r.Close(); err == nil {
t.Fatal("Error: object is already closed, should return error")
}
err = c.RemoveObject(bucketName, objectName)
if err != nil {
t.Fatal("Error: ", err)
}
err = c.RemoveBucket(bucketName)
if err != nil {
t.Fatal("Error:", err)
}
}
// Tests removing partially uploaded objects.
func TestRemovePartiallyUploadedV2(t *testing.T) {
if testing.Short() {
t.Skip("skipping function tests for short runs")
}
// Seed random based on current time.
rand.Seed(time.Now().Unix())
// Connect and make sure bucket exists.
c, err := minio.NewV2(
"s3.amazonaws.com",
os.Getenv("ACCESS_KEY"),
os.Getenv("SECRET_KEY"),
false,
)
if err != nil {
t.Fatal("Error:", err)
}
// Set user agent.
c.SetAppInfo("Minio-go-FunctionalTest", "0.1.0")
// Enable tracing, write to stdout.
// c.TraceOn(os.Stderr)
// Generate a new random bucket name.
bucketName := randString(60, rand.NewSource(time.Now().UnixNano()))
// make a new bucket.
err = c.MakeBucket(bucketName, "private", "us-east-1")
if err != nil {
t.Fatal("Error:", err, bucketName)
}
reader, writer := io.Pipe()
go func() {
i := 0
for i < 25 {
_, err = io.CopyN(writer, crand.Reader, 128*1024)
if err != nil {
t.Fatal("Error:", err, bucketName)
}
i++
}
writer.CloseWithError(errors.New("Proactively closed to be verified later."))
}()
objectName := bucketName + "-resumable"
_, err = c.PutObject(bucketName, objectName, reader, "application/octet-stream")
if err == nil {
t.Fatal("Error: PutObject should fail.")
}
if err.Error() != "Proactively closed to be verified later." {
t.Fatal("Error:", err)
}
err = c.RemoveIncompleteUpload(bucketName, objectName)
if err != nil {
t.Fatal("Error:", err)
}
err = c.RemoveBucket(bucketName)
if err != nil {
t.Fatal("Error:", err)
}
}
// Tests resumable file based put object multipart upload.
func TestResumableFPutObjectV2(t *testing.T) {
if testing.Short() {
t.Skip("skipping functional tests for the short runs")
}
// Seed random based on current time.
rand.Seed(time.Now().Unix())
// Connect and make sure bucket exists.
c, err := minio.New(
"s3.amazonaws.com",
os.Getenv("ACCESS_KEY"),
os.Getenv("SECRET_KEY"),
false,
)
if err != nil {
t.Fatal("Error:", err)
}
// Set user agent.
c.SetAppInfo("Minio-go-FunctionalTest", "0.1.0")
// Enable tracing, write to stdout.
// c.TraceOn(os.Stderr)
// Generate a new random bucket name.
bucketName := randString(60, rand.NewSource(time.Now().UnixNano()))
// make a new bucket.
err = c.MakeBucket(bucketName, "private", "us-east-1")
if err != nil {
t.Fatal("Error:", err, bucketName)
}
file, err := ioutil.TempFile(os.TempDir(), "resumable")
if err != nil {
t.Fatal("Error:", err)
}
n, err := io.CopyN(file, crand.Reader, 11*1024*1024)
if err != nil {
t.Fatal("Error:", err)
}
if n != int64(11*1024*1024) {
t.Fatalf("Error: number of bytes does not match, want %v, got %v\n", 11*1024*1024, n)
}
objectName := bucketName + "-resumable"
n, err = c.FPutObject(bucketName, objectName, file.Name(), "application/octet-stream")
if err != nil {
t.Fatal("Error:", err)
}
if n != int64(11*1024*1024) {
t.Fatalf("Error: number of bytes does not match, want %v, got %v\n", 11*1024*1024, n)
}
// Close the file pro-actively for windows.
file.Close()
err = c.RemoveObject(bucketName, objectName)
if err != nil {
t.Fatal("Error: ", err)
}
err = c.RemoveBucket(bucketName)
if err != nil {
t.Fatal("Error:", err)
}
err = os.Remove(file.Name())
if err != nil {
t.Fatal("Error:", err)
}
}
// Tests resumable put object multipart upload.
func TestResumablePutObjectV2(t *testing.T) {
if testing.Short() {
t.Skip("skipping functional tests for the short runs")
}
// Seed random based on current time.
rand.Seed(time.Now().Unix())
// Connect and make sure bucket exists.
c, err := minio.New(
"s3.amazonaws.com",
os.Getenv("ACCESS_KEY"),
os.Getenv("SECRET_KEY"),
false,
)
if err != nil {
t.Fatal("Error:", err)
}
// Enable tracing, write to stderr.
// c.TraceOn(os.Stderr)
// Set user agent.
c.SetAppInfo("Minio-go-FunctionalTest", "0.1.0")
// Generate a new random bucket name.
bucketName := randString(60, rand.NewSource(time.Now().UnixNano()))
// make a new bucket.
err = c.MakeBucket(bucketName, "private", "us-east-1")
if err != nil {
t.Fatal("Error:", err, bucketName)
}
// generate 11MB
buf := make([]byte, 11*1024*1024)
_, err = io.ReadFull(crand.Reader, buf)
if err != nil {
t.Fatal("Error:", err)
}
objectName := bucketName + "-resumable"
reader := bytes.NewReader(buf)
n, err := c.PutObject(bucketName, objectName, reader, "application/octet-stream")
if err != nil {
t.Fatal("Error:", err, bucketName, objectName)
}
if n != int64(len(buf)) {
t.Fatalf("Error: number of bytes does not match, want %v, got %v\n", len(buf), n)
}
err = c.RemoveObject(bucketName, objectName)
if err != nil {
t.Fatal("Error: ", err)
}
err = c.RemoveBucket(bucketName)
if err != nil {
t.Fatal("Error:", err)
}
}
// Tests get object ReaderSeeker interface methods.
func TestGetObjectReadSeekFunctionalV2(t *testing.T) {
if testing.Short() {
t.Skip("skipping functional tests for short runs")
}
// Seed random based on current time.
rand.Seed(time.Now().Unix())
// Connect and make sure bucket exists.
c, err := minio.New(
"s3.amazonaws.com",
os.Getenv("ACCESS_KEY"),
os.Getenv("SECRET_KEY"),
false,
)
if err != nil {
t.Fatal("Error:", err)
}
// Enable tracing, write to stderr.
// c.TraceOn(os.Stderr)
// Set user agent.
c.SetAppInfo("Minio-go-FunctionalTest", "0.1.0")
// Generate a new random bucket name.
bucketName := randString(60, rand.NewSource(time.Now().UnixNano()))
// Make a new bucket.
err = c.MakeBucket(bucketName, "private", "us-east-1")
if err != nil {
t.Fatal("Error:", err, bucketName)
}
// Generate data more than 32K
buf := make([]byte, rand.Intn(1<<20)+32*1024)
_, err = io.ReadFull(crand.Reader, buf)
if err != nil {
t.Fatal("Error:", err)
}
// Save the data
objectName := randString(60, rand.NewSource(time.Now().UnixNano()))
n, err := c.PutObject(bucketName, objectName, bytes.NewReader(buf), "binary/octet-stream")
if err != nil {
t.Fatal("Error:", err, bucketName, objectName)
}
if n != int64(len(buf)) {
t.Fatalf("Error: number of bytes does not match, want %v, got %v\n", len(buf), n)
}
// Read the data back
r, err := c.GetObject(bucketName, objectName)
if err != nil {
t.Fatal("Error:", err, bucketName, objectName)
}
st, err := r.Stat()
if err != nil {
t.Fatal("Error:", err, bucketName, objectName)
}
if st.Size != int64(len(buf)) {
t.Fatalf("Error: number of bytes in stat does not match, want %v, got %v\n",
len(buf), st.Size)
}
offset := int64(2048)
n, err = r.Seek(offset, 0)
if err != nil {
t.Fatal("Error:", err, offset)
}
if n != offset {
t.Fatalf("Error: number of bytes seeked does not match, want %v, got %v\n",
offset, n)
}
n, err = r.Seek(0, 1)
if err != nil {
t.Fatal("Error:", err)
}
if n != offset {
t.Fatalf("Error: number of current seek does not match, want %v, got %v\n",
offset, n)
}
_, err = r.Seek(offset, 2)
if err == nil {
t.Fatal("Error: seek on positive offset for whence '2' should error out")
}
n, err = r.Seek(-offset, 2)
if err != nil {
t.Fatal("Error:", err)
}
if n != 0 {
t.Fatalf("Error: number of bytes seeked back does not match, want 0, got %v\n", n)
}
var buffer bytes.Buffer
if _, err = io.CopyN(&buffer, r, st.Size); err != nil {
t.Fatal("Error:", err)
}
if !bytes.Equal(buf, buffer.Bytes()) {
t.Fatal("Error: Incorrect read bytes v/s original buffer.")
}
err = c.RemoveObject(bucketName, objectName)
if err != nil {
t.Fatal("Error: ", err)
}
err = c.RemoveBucket(bucketName)
if err != nil {
t.Fatal("Error:", err)
}
}
// Tests get object ReaderAt interface methods.
func TestGetObjectReadAtFunctionalV2(t *testing.T) {
if testing.Short() {
t.Skip("skipping functional tests for the short runs")
}
// Seed random based on current time.
rand.Seed(time.Now().Unix())
// Connect and make sure bucket exists.
c, err := minio.New(
"s3.amazonaws.com",
os.Getenv("ACCESS_KEY"),
os.Getenv("SECRET_KEY"),
false,
)
if err != nil {
t.Fatal("Error:", err)
}
// Enable tracing, write to stderr.
// c.TraceOn(os.Stderr)
// Set user agent.
c.SetAppInfo("Minio-go-FunctionalTest", "0.1.0")
// Generate a new random bucket name.
bucketName := randString(60, rand.NewSource(time.Now().UnixNano()))
// Make a new bucket.
err = c.MakeBucket(bucketName, "private", "us-east-1")
if err != nil {
t.Fatal("Error:", err, bucketName)
}
// Generate data more than 32K
buf := make([]byte, rand.Intn(1<<20)+32*1024)
_, err = io.ReadFull(crand.Reader, buf)
if err != nil {
t.Fatal("Error:", err)
}
// Save the data
objectName := randString(60, rand.NewSource(time.Now().UnixNano()))
n, err := c.PutObject(bucketName, objectName, bytes.NewReader(buf), "binary/octet-stream")
if err != nil {
t.Fatal("Error:", err, bucketName, objectName)
}
if n != int64(len(buf)) {
t.Fatalf("Error: number of bytes does not match, want %v, got %v\n", len(buf), n)
}
// Read the data back
r, err := c.GetObject(bucketName, objectName)
if err != nil {
t.Fatal("Error:", err, bucketName, objectName)
}
st, err := r.Stat()
if err != nil {
t.Fatal("Error:", err, bucketName, objectName)
}
if st.Size != int64(len(buf)) {
t.Fatalf("Error: number of bytes in stat does not match, want %v, got %v\n",
len(buf), st.Size)
}
offset := int64(2048)
// Read directly
buf2 := make([]byte, 512)
buf3 := make([]byte, 512)
buf4 := make([]byte, 512)
m, err := r.ReadAt(buf2, offset)
if err != nil {
t.Fatal("Error:", err, st.Size, len(buf2), offset)
}
if m != len(buf2) {
t.Fatalf("Error: ReadAt read shorter bytes before reaching EOF, want %v, got %v\n", m, len(buf2))
}
if !bytes.Equal(buf2, buf[offset:offset+512]) {
t.Fatal("Error: Incorrect read between two ReadAt from same offset.")
}
offset += 512
m, err = r.ReadAt(buf3, offset)
if err != nil {
t.Fatal("Error:", err, st.Size, len(buf3), offset)
}
if m != len(buf3) {
t.Fatalf("Error: ReadAt read shorter bytes before reaching EOF, want %v, got %v\n", m, len(buf3))
}
if !bytes.Equal(buf3, buf[offset:offset+512]) {
t.Fatal("Error: Incorrect read between two ReadAt from same offset.")
}
offset += 512
m, err = r.ReadAt(buf4, offset)
if err != nil {
t.Fatal("Error:", err, st.Size, len(buf4), offset)
}
if m != len(buf4) {
t.Fatalf("Error: ReadAt read shorter bytes before reaching EOF, want %v, got %v\n", m, len(buf4))
}
if !bytes.Equal(buf4, buf[offset:offset+512]) {
t.Fatal("Error: Incorrect read between two ReadAt from same offset.")
}
buf5 := make([]byte, n)
// Read the whole object.
m, err = r.ReadAt(buf5, 0)
if err != nil {
if err != io.EOF {
t.Fatal("Error:", err, len(buf5))
}
}
if m != len(buf5) {
t.Fatalf("Error: ReadAt read shorter bytes before reaching EOF, want %v, got %v\n", m, len(buf5))
}
if !bytes.Equal(buf, buf5) {
t.Fatal("Error: Incorrect data read in GetObject, than what was previously upoaded.")
}
buf6 := make([]byte, n+1)
// Read the whole object and beyond.
_, err = r.ReadAt(buf6, 0)
if err != nil {
if err != io.EOF {
t.Fatal("Error:", err, len(buf6))
}
}
err = c.RemoveObject(bucketName, objectName)
if err != nil {
t.Fatal("Error: ", err)
}
err = c.RemoveBucket(bucketName)
if err != nil {
t.Fatal("Error:", err)
}
}
// Tests comprehensive list of all methods.
func TestFunctionalV2(t *testing.T) {
if testing.Short() {
t.Skip("skipping functional tests for the short runs")
}
// Seed random based on current time.
rand.Seed(time.Now().Unix())
c, err := minio.New(
"s3.amazonaws.com",
os.Getenv("ACCESS_KEY"),
os.Getenv("SECRET_KEY"),
false,
)
if err != nil {
t.Fatal("Error:", err)
}
// Enable to debug
// c.TraceOn(os.Stderr)
// Set user agent.
c.SetAppInfo("Minio-go-FunctionalTest", "0.1.0")
// Generate a new random bucket name.
bucketName := randString(60, rand.NewSource(time.Now().UnixNano()))
// Make a new bucket.
err = c.MakeBucket(bucketName, "private", "us-east-1")
if err != nil {
t.Fatal("Error:", err, bucketName)
}
// Generate a random file name.
fileName := randString(60, rand.NewSource(time.Now().UnixNano()))
file, err := os.Create(fileName)
if err != nil {
t.Fatal("Error:", err)
}
var totalSize int64
for i := 0; i < 3; i++ {
buf := make([]byte, rand.Intn(1<<19))
n, err := file.Write(buf)
if err != nil {
t.Fatal("Error:", err)
}
totalSize += int64(n)
}
file.Close()
// Verify if bucket exits and you have access.
err = c.BucketExists(bucketName)
if err != nil {
t.Fatal("Error:", err, bucketName)
}
// Make the bucket 'public read/write'.
err = c.SetBucketACL(bucketName, "public-read-write")
if err != nil {
t.Fatal("Error:", err)
}
// Get the previously set acl.
acl, err := c.GetBucketACL(bucketName)
if err != nil {
t.Fatal("Error:", err)
}
// ACL must be 'public read/write'.
if acl != minio.BucketACL("public-read-write") {
t.Fatal("Error:", acl)
}
// List all buckets.
buckets, err := c.ListBuckets()
if len(buckets) == 0 {
t.Fatal("Error: list buckets cannot be empty", buckets)
}
if err != nil {
t.Fatal("Error:", err)
}
// Verify if previously created bucket is listed in list buckets.
bucketFound := false
for _, bucket := range buckets {
if bucket.Name == bucketName {
bucketFound = true
}
}
// If bucket not found error out.
if !bucketFound {
t.Fatal("Error: bucket ", bucketName, "not found")
}
objectName := bucketName + "unique"
// Generate data
buf := make([]byte, rand.Intn(1<<19))
_, err = io.ReadFull(crand.Reader, buf)
if err != nil {
t.Fatal("Error: ", err)
}
n, err := c.PutObject(bucketName, objectName, bytes.NewReader(buf), "")
if err != nil {
t.Fatal("Error: ", err)
}
if n != int64(len(buf)) {
t.Fatal("Error: bad length ", n, len(buf))
}
n, err = c.PutObject(bucketName, objectName+"-nolength", bytes.NewReader(buf), "binary/octet-stream")
if err != nil {
t.Fatal("Error:", err, bucketName, objectName+"-nolength")
}
if n != int64(len(buf)) {
t.Fatalf("Error: number of bytes does not match, want %v, got %v\n", len(buf), n)
}
// Instantiate a done channel to close all listing.
doneCh := make(chan struct{})
defer close(doneCh)
objFound := false
isRecursive := true // Recursive is true.
for obj := range c.ListObjects(bucketName, objectName, isRecursive, doneCh) {
if obj.Key == objectName {
objFound = true
break
}
}
if !objFound {
t.Fatal("Error: object " + objectName + " not found.")
}
incompObjNotFound := true
for objIncompl := range c.ListIncompleteUploads(bucketName, objectName, isRecursive, doneCh) {
if objIncompl.Key != "" {
incompObjNotFound = false
break
}
}
if !incompObjNotFound {
t.Fatal("Error: unexpected dangling incomplete upload found.")
}
newReader, err := c.GetObject(bucketName, objectName)
if err != nil {
t.Fatal("Error: ", err)
}
newReadBytes, err := ioutil.ReadAll(newReader)
if err != nil {
t.Fatal("Error: ", err)
}
if !bytes.Equal(newReadBytes, buf) {
t.Fatal("Error: bytes mismatch.")
}
err = c.FGetObject(bucketName, objectName, fileName+"-f")
if err != nil {
t.Fatal("Error: ", err)
}
presignedGetURL, err := c.PresignedGetObject(bucketName, objectName, 3600*time.Second)
if err != nil {
t.Fatal("Error: ", err)
}
resp, err := http.Get(presignedGetURL)
if err != nil {
t.Fatal("Error: ", err)
}
if resp.StatusCode != http.StatusOK {
t.Fatal("Error: ", resp.Status)
}
newPresignedBytes, err := ioutil.ReadAll(resp.Body)
if err != nil {
t.Fatal("Error: ", err)
}
if !bytes.Equal(newPresignedBytes, buf) {
t.Fatal("Error: bytes mismatch.")
}
presignedPutURL, err := c.PresignedPutObject(bucketName, objectName+"-presigned", 3600*time.Second)
if err != nil {
t.Fatal("Error: ", err)
}
buf = make([]byte, rand.Intn(1<<20))
_, err = io.ReadFull(crand.Reader, buf)
if err != nil {
t.Fatal("Error: ", err)
}
req, err := http.NewRequest("PUT", presignedPutURL, bytes.NewReader(buf))
if err != nil {
t.Fatal("Error: ", err)
}
httpClient := &http.Client{}
resp, err = httpClient.Do(req)
if err != nil {
t.Fatal("Error: ", err)
}
newReader, err = c.GetObject(bucketName, objectName+"-presigned")
if err != nil {
t.Fatal("Error: ", err)
}
newReadBytes, err = ioutil.ReadAll(newReader)
if err != nil {
t.Fatal("Error: ", err)
}
if !bytes.Equal(newReadBytes, buf) {
t.Fatal("Error: bytes mismatch.")
}
err = c.RemoveObject(bucketName, objectName)
if err != nil {
t.Fatal("Error: ", err)
}
err = c.RemoveObject(bucketName, objectName+"-f")
if err != nil {
t.Fatal("Error: ", err)
}
err = c.RemoveObject(bucketName, objectName+"-nolength")
if err != nil {
t.Fatal("Error: ", err)
}
err = c.RemoveObject(bucketName, objectName+"-presigned")
if err != nil {
t.Fatal("Error: ", err)
}
err = c.RemoveBucket(bucketName)
if err != nil {
t.Fatal("Error:", err)
}
err = c.RemoveBucket(bucketName)
if err == nil {
t.Fatal("Error:")
}
if err.Error() != "The specified bucket does not exist" {
t.Fatal("Error: ", err)
}
if err = os.Remove(fileName); err != nil {
t.Fatal("Error: ", err)
}
if err = os.Remove(fileName + "-f"); err != nil {
t.Fatal("Error: ", err)
}
}

View File

@ -1,859 +0,0 @@
/*
* Minio Go Library for Amazon S3 Compatible Cloud Storage (C) 2015 Minio, Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package minio_test
import (
"bytes"
crand "crypto/rand"
"errors"
"io"
"io/ioutil"
"math/rand"
"net/http"
"os"
"testing"
"time"
"github.com/minio/minio-go"
)
const letterBytes = "abcdefghijklmnopqrstuvwxyz01234569"
const (
letterIdxBits = 6 // 6 bits to represent a letter index
letterIdxMask = 1<<letterIdxBits - 1 // All 1-bits, as many as letterIdxBits
letterIdxMax = 63 / letterIdxBits // # of letter indices fitting in 63 bits
)
func randString(n int, src rand.Source) string {
b := make([]byte, n)
// A rand.Int63() generates 63 random bits, enough for letterIdxMax letters!
for i, cache, remain := n-1, src.Int63(), letterIdxMax; i >= 0; {
if remain == 0 {
cache, remain = src.Int63(), letterIdxMax
}
if idx := int(cache & letterIdxMask); idx < len(letterBytes) {
b[i] = letterBytes[idx]
i--
}
cache >>= letterIdxBits
remain--
}
return string(b[0:30])
}
func TestGetObjectClosedTwice(t *testing.T) {
if testing.Short() {
t.Skip("skipping functional tests for short runs")
}
// Seed random based on current time.
rand.Seed(time.Now().Unix())
// Connect and make sure bucket exists.
c, err := minio.New(
"s3.amazonaws.com",
os.Getenv("ACCESS_KEY"),
os.Getenv("SECRET_KEY"),
false,
)
if err != nil {
t.Fatal("Error:", err)
}
// Enable tracing, write to stderr.
// c.TraceOn(os.Stderr)
// Set user agent.
c.SetAppInfo("Minio-go-FunctionalTest", "0.1.0")
// Generate a new random bucket name.
bucketName := randString(60, rand.NewSource(time.Now().UnixNano()))
// Make a new bucket.
err = c.MakeBucket(bucketName, "private", "us-east-1")
if err != nil {
t.Fatal("Error:", err, bucketName)
}
// Generate data more than 32K
buf := make([]byte, rand.Intn(1<<20)+32*1024)
_, err = io.ReadFull(crand.Reader, buf)
if err != nil {
t.Fatal("Error:", err)
}
// Save the data
objectName := randString(60, rand.NewSource(time.Now().UnixNano()))
n, err := c.PutObject(bucketName, objectName, bytes.NewReader(buf), "binary/octet-stream")
if err != nil {
t.Fatal("Error:", err, bucketName, objectName)
}
if n != int64(len(buf)) {
t.Fatalf("Error: number of bytes does not match, want %v, got %v\n", len(buf), n)
}
// Read the data back
r, err := c.GetObject(bucketName, objectName)
if err != nil {
t.Fatal("Error:", err, bucketName, objectName)
}
st, err := r.Stat()
if err != nil {
t.Fatal("Error:", err, bucketName, objectName)
}
if st.Size != int64(len(buf)) {
t.Fatalf("Error: number of bytes in stat does not match, want %v, got %v\n",
len(buf), st.Size)
}
if err := r.Close(); err != nil {
t.Fatal("Error:", err)
}
if err := r.Close(); err == nil {
t.Fatal("Error: object is already closed, should return error")
}
err = c.RemoveObject(bucketName, objectName)
if err != nil {
t.Fatal("Error: ", err)
}
err = c.RemoveBucket(bucketName)
if err != nil {
t.Fatal("Error:", err)
}
}
// Tests removing partially uploaded objects.
func TestRemovePartiallyUploaded(t *testing.T) {
if testing.Short() {
t.Skip("skipping function tests for short runs")
}
// Seed random based on current time.
rand.Seed(time.Now().Unix())
// Connect and make sure bucket exists.
c, err := minio.New(
"s3.amazonaws.com",
os.Getenv("ACCESS_KEY"),
os.Getenv("SECRET_KEY"),
false,
)
if err != nil {
t.Fatal("Error:", err)
}
// Set user agent.
c.SetAppInfo("Minio-go-FunctionalTest", "0.1.0")
// Enable tracing, write to stdout.
// c.TraceOn(os.Stderr)
// Generate a new random bucket name.
bucketName := randString(60, rand.NewSource(time.Now().UnixNano()))
// Make a new bucket.
err = c.MakeBucket(bucketName, "private", "us-east-1")
if err != nil {
t.Fatal("Error:", err, bucketName)
}
reader, writer := io.Pipe()
go func() {
i := 0
for i < 25 {
_, err = io.CopyN(writer, crand.Reader, 128*1024)
if err != nil {
t.Fatal("Error:", err, bucketName)
}
i++
}
writer.CloseWithError(errors.New("Proactively closed to be verified later."))
}()
objectName := bucketName + "-resumable"
_, err = c.PutObject(bucketName, objectName, reader, "application/octet-stream")
if err == nil {
t.Fatal("Error: PutObject should fail.")
}
if err.Error() != "Proactively closed to be verified later." {
t.Fatal("Error:", err)
}
err = c.RemoveIncompleteUpload(bucketName, objectName)
if err != nil {
t.Fatal("Error:", err)
}
err = c.RemoveBucket(bucketName)
if err != nil {
t.Fatal("Error:", err)
}
}
// Tests resumable file based put object multipart upload.
func TestResumableFPutObject(t *testing.T) {
if testing.Short() {
t.Skip("skipping functional tests for the short runs")
}
// Seed random based on current time.
rand.Seed(time.Now().Unix())
// Connect and make sure bucket exists.
c, err := minio.New(
"s3.amazonaws.com",
os.Getenv("ACCESS_KEY"),
os.Getenv("SECRET_KEY"),
false,
)
if err != nil {
t.Fatal("Error:", err)
}
// Set user agent.
c.SetAppInfo("Minio-go-FunctionalTest", "0.1.0")
// Enable tracing, write to stdout.
// c.TraceOn(os.Stderr)
// Generate a new random bucket name.
bucketName := randString(60, rand.NewSource(time.Now().UnixNano()))
// Make a new bucket.
err = c.MakeBucket(bucketName, "private", "us-east-1")
if err != nil {
t.Fatal("Error:", err, bucketName)
}
file, err := ioutil.TempFile(os.TempDir(), "resumable")
if err != nil {
t.Fatal("Error:", err)
}
n, err := io.CopyN(file, crand.Reader, 11*1024*1024)
if err != nil {
t.Fatal("Error:", err)
}
if n != int64(11*1024*1024) {
t.Fatalf("Error: number of bytes does not match, want %v, got %v\n", 11*1024*1024, n)
}
objectName := bucketName + "-resumable"
n, err = c.FPutObject(bucketName, objectName, file.Name(), "application/octet-stream")
if err != nil {
t.Fatal("Error:", err)
}
if n != int64(11*1024*1024) {
t.Fatalf("Error: number of bytes does not match, want %v, got %v\n", 11*1024*1024, n)
}
// Close the file pro-actively for windows.
file.Close()
err = c.RemoveObject(bucketName, objectName)
if err != nil {
t.Fatal("Error: ", err)
}
err = c.RemoveBucket(bucketName)
if err != nil {
t.Fatal("Error:", err)
}
err = os.Remove(file.Name())
if err != nil {
t.Fatal("Error:", err)
}
}
// Tests resumable put object multipart upload.
func TestResumablePutObject(t *testing.T) {
if testing.Short() {
t.Skip("skipping functional tests for the short runs")
}
// Seed random based on current time.
rand.Seed(time.Now().Unix())
// Connect and make sure bucket exists.
c, err := minio.New(
"s3.amazonaws.com",
os.Getenv("ACCESS_KEY"),
os.Getenv("SECRET_KEY"),
false,
)
if err != nil {
t.Fatal("Error:", err)
}
// Enable tracing, write to stderr.
// c.TraceOn(os.Stderr)
// Set user agent.
c.SetAppInfo("Minio-go-FunctionalTest", "0.1.0")
// Generate a new random bucket name.
bucketName := randString(60, rand.NewSource(time.Now().UnixNano()))
// Make a new bucket.
err = c.MakeBucket(bucketName, "private", "us-east-1")
if err != nil {
t.Fatal("Error:", err, bucketName)
}
// Generate 11MB
buf := make([]byte, 11*1024*1024)
_, err = io.ReadFull(crand.Reader, buf)
if err != nil {
t.Fatal("Error:", err)
}
objectName := bucketName + "-resumable"
reader := bytes.NewReader(buf)
n, err := c.PutObject(bucketName, objectName, reader, "application/octet-stream")
if err != nil {
t.Fatal("Error:", err, bucketName, objectName)
}
if n != int64(len(buf)) {
t.Fatalf("Error: number of bytes does not match, want %v, got %v\n", len(buf), n)
}
err = c.RemoveObject(bucketName, objectName)
if err != nil {
t.Fatal("Error: ", err)
}
err = c.RemoveBucket(bucketName)
if err != nil {
t.Fatal("Error:", err)
}
}
// Tests get object ReaderSeeker interface methods.
func TestGetObjectReadSeekFunctional(t *testing.T) {
if testing.Short() {
t.Skip("skipping functional tests for short runs")
}
// Seed random based on current time.
rand.Seed(time.Now().Unix())
// Connect and make sure bucket exists.
c, err := minio.New(
"s3.amazonaws.com",
os.Getenv("ACCESS_KEY"),
os.Getenv("SECRET_KEY"),
false,
)
if err != nil {
t.Fatal("Error:", err)
}
// Enable tracing, write to stderr.
// c.TraceOn(os.Stderr)
// Set user agent.
c.SetAppInfo("Minio-go-FunctionalTest", "0.1.0")
// Generate a new random bucket name.
bucketName := randString(60, rand.NewSource(time.Now().UnixNano()))
// Make a new bucket.
err = c.MakeBucket(bucketName, "private", "us-east-1")
if err != nil {
t.Fatal("Error:", err, bucketName)
}
// Generate data more than 32K
buf := make([]byte, rand.Intn(1<<20)+32*1024)
_, err = io.ReadFull(crand.Reader, buf)
if err != nil {
t.Fatal("Error:", err)
}
// Save the data
objectName := randString(60, rand.NewSource(time.Now().UnixNano()))
n, err := c.PutObject(bucketName, objectName, bytes.NewReader(buf), "binary/octet-stream")
if err != nil {
t.Fatal("Error:", err, bucketName, objectName)
}
if n != int64(len(buf)) {
t.Fatalf("Error: number of bytes does not match, want %v, got %v\n", len(buf), n)
}
// Read the data back
r, err := c.GetObject(bucketName, objectName)
if err != nil {
t.Fatal("Error:", err, bucketName, objectName)
}
st, err := r.Stat()
if err != nil {
t.Fatal("Error:", err, bucketName, objectName)
}
if st.Size != int64(len(buf)) {
t.Fatalf("Error: number of bytes in stat does not match, want %v, got %v\n",
len(buf), st.Size)
}
offset := int64(2048)
n, err = r.Seek(offset, 0)
if err != nil {
t.Fatal("Error:", err, offset)
}
if n != offset {
t.Fatalf("Error: number of bytes seeked does not match, want %v, got %v\n",
offset, n)
}
n, err = r.Seek(0, 1)
if err != nil {
t.Fatal("Error:", err)
}
if n != offset {
t.Fatalf("Error: number of current seek does not match, want %v, got %v\n",
offset, n)
}
_, err = r.Seek(offset, 2)
if err == nil {
t.Fatal("Error: seek on positive offset for whence '2' should error out")
}
n, err = r.Seek(-offset, 2)
if err != nil {
t.Fatal("Error:", err)
}
if n != 0 {
t.Fatalf("Error: number of bytes seeked back does not match, want 0, got %v\n", n)
}
var buffer bytes.Buffer
if _, err = io.CopyN(&buffer, r, st.Size); err != nil {
t.Fatal("Error:", err)
}
if !bytes.Equal(buf, buffer.Bytes()) {
t.Fatal("Error: Incorrect read bytes v/s original buffer.")
}
err = c.RemoveObject(bucketName, objectName)
if err != nil {
t.Fatal("Error: ", err)
}
err = c.RemoveBucket(bucketName)
if err != nil {
t.Fatal("Error:", err)
}
}
// Tests get object ReaderAt interface methods.
func TestGetObjectReadAtFunctional(t *testing.T) {
if testing.Short() {
t.Skip("skipping functional tests for the short runs")
}
// Seed random based on current time.
rand.Seed(time.Now().Unix())
// Connect and make sure bucket exists.
c, err := minio.New(
"s3.amazonaws.com",
os.Getenv("ACCESS_KEY"),
os.Getenv("SECRET_KEY"),
false,
)
if err != nil {
t.Fatal("Error:", err)
}
// Enable tracing, write to stderr.
// c.TraceOn(os.Stderr)
// Set user agent.
c.SetAppInfo("Minio-go-FunctionalTest", "0.1.0")
// Generate a new random bucket name.
bucketName := randString(60, rand.NewSource(time.Now().UnixNano()))
// Make a new bucket.
err = c.MakeBucket(bucketName, "private", "us-east-1")
if err != nil {
t.Fatal("Error:", err, bucketName)
}
// Generate data more than 32K
buf := make([]byte, rand.Intn(1<<20)+32*1024)
_, err = io.ReadFull(crand.Reader, buf)
if err != nil {
t.Fatal("Error:", err)
}
// Save the data
objectName := randString(60, rand.NewSource(time.Now().UnixNano()))
n, err := c.PutObject(bucketName, objectName, bytes.NewReader(buf), "binary/octet-stream")
if err != nil {
t.Fatal("Error:", err, bucketName, objectName)
}
if n != int64(len(buf)) {
t.Fatalf("Error: number of bytes does not match, want %v, got %v\n", len(buf), n)
}
// read the data back
r, err := c.GetObject(bucketName, objectName)
if err != nil {
t.Fatal("Error:", err, bucketName, objectName)
}
st, err := r.Stat()
if err != nil {
t.Fatal("Error:", err, bucketName, objectName)
}
if st.Size != int64(len(buf)) {
t.Fatalf("Error: number of bytes in stat does not match, want %v, got %v\n",
len(buf), st.Size)
}
offset := int64(2048)
// read directly
buf2 := make([]byte, 512)
buf3 := make([]byte, 512)
buf4 := make([]byte, 512)
m, err := r.ReadAt(buf2, offset)
if err != nil {
t.Fatal("Error:", err, st.Size, len(buf2), offset)
}
if m != len(buf2) {
t.Fatalf("Error: ReadAt read shorter bytes before reaching EOF, want %v, got %v\n", m, len(buf2))
}
if !bytes.Equal(buf2, buf[offset:offset+512]) {
t.Fatal("Error: Incorrect read between two ReadAt from same offset.")
}
offset += 512
m, err = r.ReadAt(buf3, offset)
if err != nil {
t.Fatal("Error:", err, st.Size, len(buf3), offset)
}
if m != len(buf3) {
t.Fatalf("Error: ReadAt read shorter bytes before reaching EOF, want %v, got %v\n", m, len(buf3))
}
if !bytes.Equal(buf3, buf[offset:offset+512]) {
t.Fatal("Error: Incorrect read between two ReadAt from same offset.")
}
offset += 512
m, err = r.ReadAt(buf4, offset)
if err != nil {
t.Fatal("Error:", err, st.Size, len(buf4), offset)
}
if m != len(buf4) {
t.Fatalf("Error: ReadAt read shorter bytes before reaching EOF, want %v, got %v\n", m, len(buf4))
}
if !bytes.Equal(buf4, buf[offset:offset+512]) {
t.Fatal("Error: Incorrect read between two ReadAt from same offset.")
}
buf5 := make([]byte, n)
// Read the whole object.
m, err = r.ReadAt(buf5, 0)
if err != nil {
if err != io.EOF {
t.Fatal("Error:", err, len(buf5))
}
}
if m != len(buf5) {
t.Fatalf("Error: ReadAt read shorter bytes before reaching EOF, want %v, got %v\n", m, len(buf5))
}
if !bytes.Equal(buf, buf5) {
t.Fatal("Error: Incorrect data read in GetObject, than what was previously upoaded.")
}
buf6 := make([]byte, n+1)
// Read the whole object and beyond.
_, err = r.ReadAt(buf6, 0)
if err != nil {
if err != io.EOF {
t.Fatal("Error:", err, len(buf6))
}
}
err = c.RemoveObject(bucketName, objectName)
if err != nil {
t.Fatal("Error: ", err)
}
err = c.RemoveBucket(bucketName)
if err != nil {
t.Fatal("Error:", err)
}
}
// Tests comprehensive list of all methods.
func TestFunctional(t *testing.T) {
if testing.Short() {
t.Skip("skipping functional tests for the short runs")
}
// Seed random based on current time.
rand.Seed(time.Now().Unix())
c, err := minio.New(
"s3.amazonaws.com",
os.Getenv("ACCESS_KEY"),
os.Getenv("SECRET_KEY"),
false,
)
if err != nil {
t.Fatal("Error:", err)
}
// Enable to debug
// c.TraceOn(os.Stderr)
// Set user agent.
c.SetAppInfo("Minio-go-FunctionalTest", "0.1.0")
// Generate a new random bucket name.
bucketName := randString(60, rand.NewSource(time.Now().UnixNano()))
// Make a new bucket.
err = c.MakeBucket(bucketName, "private", "us-east-1")
if err != nil {
t.Fatal("Error:", err, bucketName)
}
// Generate a random file name.
fileName := randString(60, rand.NewSource(time.Now().UnixNano()))
file, err := os.Create(fileName)
if err != nil {
t.Fatal("Error:", err)
}
var totalSize int64
for i := 0; i < 3; i++ {
buf := make([]byte, rand.Intn(1<<19))
n, err := file.Write(buf)
if err != nil {
t.Fatal("Error:", err)
}
totalSize += int64(n)
}
file.Close()
// Verify if bucket exits and you have access.
err = c.BucketExists(bucketName)
if err != nil {
t.Fatal("Error:", err, bucketName)
}
// Make the bucket 'public read/write'.
err = c.SetBucketACL(bucketName, "public-read-write")
if err != nil {
t.Fatal("Error:", err)
}
// Get the previously set acl.
acl, err := c.GetBucketACL(bucketName)
if err != nil {
t.Fatal("Error:", err)
}
// ACL must be 'public read/write'.
if acl != minio.BucketACL("public-read-write") {
t.Fatal("Error:", acl)
}
// List all buckets.
buckets, err := c.ListBuckets()
if len(buckets) == 0 {
t.Fatal("Error: list buckets cannot be empty", buckets)
}
if err != nil {
t.Fatal("Error:", err)
}
// Verify if previously created bucket is listed in list buckets.
bucketFound := false
for _, bucket := range buckets {
if bucket.Name == bucketName {
bucketFound = true
}
}
// If bucket not found error out.
if !bucketFound {
t.Fatal("Error: bucket ", bucketName, "not found")
}
objectName := bucketName + "unique"
// Generate data
buf := make([]byte, rand.Intn(1<<19))
_, err = io.ReadFull(crand.Reader, buf)
if err != nil {
t.Fatal("Error: ", err)
}
n, err := c.PutObject(bucketName, objectName, bytes.NewReader(buf), "")
if err != nil {
t.Fatal("Error: ", err)
}
if n != int64(len(buf)) {
t.Fatal("Error: bad length ", n, len(buf))
}
n, err = c.PutObject(bucketName, objectName+"-nolength", bytes.NewReader(buf), "binary/octet-stream")
if err != nil {
t.Fatal("Error:", err, bucketName, objectName+"-nolength")
}
if n != int64(len(buf)) {
t.Fatalf("Error: number of bytes does not match, want %v, got %v\n", len(buf), n)
}
// Instantiate a done channel to close all listing.
doneCh := make(chan struct{})
defer close(doneCh)
objFound := false
isRecursive := true // Recursive is true.
for obj := range c.ListObjects(bucketName, objectName, isRecursive, doneCh) {
if obj.Key == objectName {
objFound = true
break
}
}
if !objFound {
t.Fatal("Error: object " + objectName + " not found.")
}
incompObjNotFound := true
for objIncompl := range c.ListIncompleteUploads(bucketName, objectName, isRecursive, doneCh) {
if objIncompl.Key != "" {
incompObjNotFound = false
break
}
}
if !incompObjNotFound {
t.Fatal("Error: unexpected dangling incomplete upload found.")
}
newReader, err := c.GetObject(bucketName, objectName)
if err != nil {
t.Fatal("Error: ", err)
}
newReadBytes, err := ioutil.ReadAll(newReader)
if err != nil {
t.Fatal("Error: ", err)
}
if !bytes.Equal(newReadBytes, buf) {
t.Fatal("Error: bytes mismatch.")
}
err = c.FGetObject(bucketName, objectName, fileName+"-f")
if err != nil {
t.Fatal("Error: ", err)
}
presignedGetURL, err := c.PresignedGetObject(bucketName, objectName, 3600*time.Second)
if err != nil {
t.Fatal("Error: ", err)
}
resp, err := http.Get(presignedGetURL)
if err != nil {
t.Fatal("Error: ", err)
}
if resp.StatusCode != http.StatusOK {
t.Fatal("Error: ", resp.Status)
}
newPresignedBytes, err := ioutil.ReadAll(resp.Body)
if err != nil {
t.Fatal("Error: ", err)
}
if !bytes.Equal(newPresignedBytes, buf) {
t.Fatal("Error: bytes mismatch.")
}
presignedPutURL, err := c.PresignedPutObject(bucketName, objectName+"-presigned", 3600*time.Second)
if err != nil {
t.Fatal("Error: ", err)
}
buf = make([]byte, rand.Intn(1<<20))
_, err = io.ReadFull(crand.Reader, buf)
if err != nil {
t.Fatal("Error: ", err)
}
req, err := http.NewRequest("PUT", presignedPutURL, bytes.NewReader(buf))
if err != nil {
t.Fatal("Error: ", err)
}
httpClient := &http.Client{}
resp, err = httpClient.Do(req)
if err != nil {
t.Fatal("Error: ", err)
}
newReader, err = c.GetObject(bucketName, objectName+"-presigned")
if err != nil {
t.Fatal("Error: ", err)
}
newReadBytes, err = ioutil.ReadAll(newReader)
if err != nil {
t.Fatal("Error: ", err)
}
if !bytes.Equal(newReadBytes, buf) {
t.Fatal("Error: bytes mismatch.")
}
err = c.RemoveObject(bucketName, objectName)
if err != nil {
t.Fatal("Error: ", err)
}
err = c.RemoveObject(bucketName, objectName+"-f")
if err != nil {
t.Fatal("Error: ", err)
}
err = c.RemoveObject(bucketName, objectName+"-nolength")
if err != nil {
t.Fatal("Error: ", err)
}
err = c.RemoveObject(bucketName, objectName+"-presigned")
if err != nil {
t.Fatal("Error: ", err)
}
err = c.RemoveBucket(bucketName)
if err != nil {
t.Fatal("Error:", err)
}
err = c.RemoveBucket(bucketName)
if err == nil {
t.Fatal("Error:")
}
if err.Error() != "The specified bucket does not exist" {
t.Fatal("Error: ", err)
}
if err = os.Remove(fileName); err != nil {
t.Fatal("Error: ", err)
}
if err = os.Remove(fileName + "-f"); err != nil {
t.Fatal("Error: ", err)
}
}

View File

@ -1,364 +0,0 @@
/*
* Minio Go Library for Amazon S3 Compatible Cloud Storage (C) 2015 Minio, Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package minio
import (
"fmt"
"net/http"
"net/url"
"strings"
"testing"
)
func TestEncodeURL2Path(t *testing.T) {
type urlStrings struct {
objName string
encodedObjName string
}
bucketName := "bucketName"
want := []urlStrings{
{
objName: "本語",
encodedObjName: "%E6%9C%AC%E8%AA%9E",
},
{
objName: "本語.1",
encodedObjName: "%E6%9C%AC%E8%AA%9E.1",
},
{
objName: ">123>3123123",
encodedObjName: "%3E123%3E3123123",
},
{
objName: "test 1 2.txt",
encodedObjName: "test%201%202.txt",
},
{
objName: "test++ 1.txt",
encodedObjName: "test%2B%2B%201.txt",
},
}
for _, o := range want {
u, err := url.Parse(fmt.Sprintf("https://%s.s3.amazonaws.com/%s", bucketName, o.objName))
if err != nil {
t.Fatal("Error:", err)
}
urlPath := "/" + bucketName + "/" + o.encodedObjName
if urlPath != encodeURL2Path(u) {
t.Fatal("Error")
}
}
}
func TestErrorResponse(t *testing.T) {
var err error
err = ErrorResponse{
Code: "Testing",
}
errResp := ToErrorResponse(err)
if errResp.Code != "Testing" {
t.Fatal("Type conversion failed, we have an empty struct.")
}
// Test http response decoding.
var httpResponse *http.Response
// Set empty variables
httpResponse = nil
var bucketName, objectName string
// Should fail with invalid argument.
err = HTTPRespToErrorResponse(httpResponse, bucketName, objectName)
errResp = ToErrorResponse(err)
if errResp.Code != "InvalidArgument" {
t.Fatal("Empty response input should return invalid argument.")
}
}
func TestSignatureCalculation(t *testing.T) {
req, err := http.NewRequest("GET", "https://s3.amazonaws.com", nil)
if err != nil {
t.Fatal("Error:", err)
}
req = SignV4(*req, "", "", "us-east-1")
if req.Header.Get("Authorization") != "" {
t.Fatal("Error: anonymous credentials should not have Authorization header.")
}
req = PreSignV4(*req, "", "", "us-east-1", 0)
if strings.Contains(req.URL.RawQuery, "X-Amz-Signature") {
t.Fatal("Error: anonymous credentials should not have Signature query resource.")
}
req = SignV2(*req, "", "")
if req.Header.Get("Authorization") != "" {
t.Fatal("Error: anonymous credentials should not have Authorization header.")
}
req = PreSignV2(*req, "", "", 0)
if strings.Contains(req.URL.RawQuery, "Signature") {
t.Fatal("Error: anonymous credentials should not have Signature query resource.")
}
req = SignV4(*req, "ACCESS-KEY", "SECRET-KEY", "us-east-1")
if req.Header.Get("Authorization") == "" {
t.Fatal("Error: normal credentials should have Authorization header.")
}
req = PreSignV4(*req, "ACCESS-KEY", "SECRET-KEY", "us-east-1", 0)
if !strings.Contains(req.URL.RawQuery, "X-Amz-Signature") {
t.Fatal("Error: normal credentials should have Signature query resource.")
}
req = SignV2(*req, "ACCESS-KEY", "SECRET-KEY")
if req.Header.Get("Authorization") == "" {
t.Fatal("Error: normal credentials should have Authorization header.")
}
req = PreSignV2(*req, "ACCESS-KEY", "SECRET-KEY", 0)
if !strings.Contains(req.URL.RawQuery, "Signature") {
t.Fatal("Error: normal credentials should not have Signature query resource.")
}
}
func TestSignatureType(t *testing.T) {
clnt := Client{}
if !clnt.signature.isV4() {
t.Fatal("Error")
}
clnt.signature = SignatureV2
if !clnt.signature.isV2() {
t.Fatal("Error")
}
if clnt.signature.isV4() {
t.Fatal("Error")
}
clnt.signature = SignatureV4
if !clnt.signature.isV4() {
t.Fatal("Error")
}
}
func TestACLTypes(t *testing.T) {
want := map[string]bool{
"private": true,
"public-read": true,
"public-read-write": true,
"authenticated-read": true,
"invalid": false,
}
for acl, ok := range want {
if BucketACL(acl).isValidBucketACL() != ok {
t.Fatal("Error")
}
}
}
func TestPartSize(t *testing.T) {
var maxPartSize int64 = 1024 * 1024 * 1024 * 5
partSize := optimalPartSize(5000000000000000000)
if partSize > minimumPartSize {
if partSize > maxPartSize {
t.Fatal("invalid result, cannot be bigger than maxPartSize 5GiB")
}
}
partSize = optimalPartSize(50000000000)
if partSize > minimumPartSize {
t.Fatal("invalid result, cannot be bigger than minimumPartSize 5MiB")
}
}
func TestURLEncoding(t *testing.T) {
type urlStrings struct {
name string
encodedName string
}
want := []urlStrings{
{
name: "bigfile-1._%",
encodedName: "bigfile-1._%25",
},
{
name: "本語",
encodedName: "%E6%9C%AC%E8%AA%9E",
},
{
name: "本語.1",
encodedName: "%E6%9C%AC%E8%AA%9E.1",
},
{
name: ">123>3123123",
encodedName: "%3E123%3E3123123",
},
{
name: "test 1 2.txt",
encodedName: "test%201%202.txt",
},
{
name: "test++ 1.txt",
encodedName: "test%2B%2B%201.txt",
},
}
for _, u := range want {
if u.encodedName != urlEncodePath(u.name) {
t.Fatal("Error")
}
}
}
func TestGetEndpointURL(t *testing.T) {
if _, err := getEndpointURL("s3.amazonaws.com", false); err != nil {
t.Fatal("Error:", err)
}
if _, err := getEndpointURL("192.168.1.1", false); err != nil {
t.Fatal("Error:", err)
}
if _, err := getEndpointURL("13333.123123.-", false); err == nil {
t.Fatal("Error")
}
if _, err := getEndpointURL("s3.aamzza.-", false); err == nil {
t.Fatal("Error")
}
if _, err := getEndpointURL("s3.amazonaws.com:443", false); err == nil {
t.Fatal("Error")
}
}
func TestValidIP(t *testing.T) {
type validIP struct {
ip string
valid bool
}
want := []validIP{
{
ip: "192.168.1.1",
valid: true,
},
{
ip: "192.1.8",
valid: false,
},
{
ip: "..192.",
valid: false,
},
{
ip: "192.168.1.1.1",
valid: false,
},
}
for _, w := range want {
valid := isValidIP(w.ip)
if valid != w.valid {
t.Fatal("Error")
}
}
}
func TestValidEndpointDomain(t *testing.T) {
type validEndpoint struct {
endpointDomain string
valid bool
}
want := []validEndpoint{
{
endpointDomain: "s3.amazonaws.com",
valid: true,
},
{
endpointDomain: "s3.amazonaws.com_",
valid: false,
},
{
endpointDomain: "%$$$",
valid: false,
},
{
endpointDomain: "s3.amz.test.com",
valid: true,
},
{
endpointDomain: "s3.%%",
valid: false,
},
{
endpointDomain: "localhost",
valid: true,
},
{
endpointDomain: "-localhost",
valid: false,
},
{
endpointDomain: "",
valid: false,
},
{
endpointDomain: "\n \t",
valid: false,
},
{
endpointDomain: " ",
valid: false,
},
}
for _, w := range want {
valid := isValidDomain(w.endpointDomain)
if valid != w.valid {
t.Fatal("Error:", w.endpointDomain)
}
}
}
func TestValidEndpointURL(t *testing.T) {
type validURL struct {
url string
valid bool
}
want := []validURL{
{
url: "https://s3.amazonaws.com",
valid: true,
},
{
url: "https://s3.amazonaws.com/bucket/object",
valid: false,
},
{
url: "192.168.1.1",
valid: false,
},
}
for _, w := range want {
u, err := url.Parse(w.url)
if err != nil {
t.Fatal("Error:", err)
}
valid := false
if err := isValidEndpointURL(u); err == nil {
valid = true
}
if valid != w.valid {
t.Fatal("Error")
}
}
}

View File

@ -90,7 +90,7 @@ func (c Client) getBucketLocation(bucketName string) (string, error) {
}
if resp != nil {
if resp.StatusCode != http.StatusOK {
return "", HTTPRespToErrorResponse(resp, bucketName, "")
return "", httpRespToErrorResponse(resp, bucketName, "")
}
}
@ -127,7 +127,7 @@ func (c Client) getBucketLocationRequest(bucketName string) (*http.Request, erro
// Set get bucket location always as path style.
targetURL := c.endpointURL
targetURL.Path = filepath.Join(bucketName, "")
targetURL.Path = filepath.Join(bucketName, "") + "/"
targetURL.RawQuery = urlValues.Encode()
// Get a new HTTP request for the method.
@ -146,9 +146,9 @@ func (c Client) getBucketLocationRequest(bucketName string) (*http.Request, erro
// Sign the request.
if c.signature.isV4() {
req = SignV4(*req, c.accessKeyID, c.secretAccessKey, "us-east-1")
req = signV4(*req, c.accessKeyID, c.secretAccessKey, "us-east-1")
} else if c.signature.isV2() {
req = SignV2(*req, c.accessKeyID, c.secretAccessKey)
req = signV2(*req, c.accessKeyID, c.secretAccessKey)
}
return req, nil
}

View File

@ -18,12 +18,12 @@ package minio
/// Multipart upload defaults.
// minimumPartSize - minimum part size 5MiB per object after which
// miniPartSize - minimum part size 5MiB per object after which
// putObject behaves internally as multipart.
const minimumPartSize = 1024 * 1024 * 5
const minPartSize = 1024 * 1024 * 5
// maxParts - maximum parts for a single multipart session.
const maxParts = 10000
// maxPartsCount - maximum number of parts for a single multipart session.
const maxPartsCount = 10000
// maxPartSize - maximum part size 5GiB for a single multipart upload
// operation.
@ -37,6 +37,6 @@ const maxSinglePutObjectSize = 1024 * 1024 * 1024 * 5
// Multipart operation.
const maxMultipartPutObjectSize = 1024 * 1024 * 1024 * 1024 * 5
// optimalReadAtBufferSize - optimal buffer 5MiB used for reading
// through ReadAt operation.
const optimalReadAtBufferSize = 1024 * 1024 * 5
// optimalReadBufferSize - optimal buffer 5MiB used for reading
// through Read operation.
const optimalReadBufferSize = 1024 * 1024 * 5

View File

@ -45,12 +45,6 @@ func main() {
}
defer reader.Close()
reader, err := s3Client.GetObject("my-bucketname", "my-objectname")
if err != nil {
log.Fatalln(err)
}
defer reader.Close()
localFile, err := os.Create("my-testfile")
if err != nil {
log.Fatalln(err)

View File

@ -39,7 +39,7 @@ func main() {
}
// Create a done channel to control 'ListObjects' go routine.
doneCh := make(struct{})
doneCh := make(chan struct{})
// Indicate to our routine to exit cleanly upon return.
defer close(doneCh)

View File

@ -39,7 +39,7 @@ func main() {
}
// Create a done channel to control 'ListObjects' go routine.
doneCh := make(struct{})
doneCh := make(chan struct{})
// Indicate to our routine to exit cleanly upon return.
defer close(doneCh)

View File

@ -40,7 +40,7 @@ func main() {
}
// Create a done channel to control 'ListObjects' go routine.
doneCh := make(struct{})
doneCh := make(chan struct{})
// Indicate to our routine to exit cleanly upon return.
defer close(doneCh)

View File

@ -40,7 +40,7 @@ func main() {
}
// Create a done channel to control 'ListObjects' go routine.
doneCh := make(struct{})
doneCh := make(chan struct{})
// Indicate to our routine to exit cleanly upon return.
defer close(doneCh)

View File

@ -0,0 +1,64 @@
// +build ignore
/*
* Minio Go Library for Amazon S3 Compatible Cloud Storage (C) 2015 Minio, Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package main
import (
"log"
"github.com/minio/minio-go"
"github.com/minio/pb"
)
func main() {
// Note: YOUR-ACCESSKEYID, YOUR-SECRETACCESSKEY, my-testfile, my-bucketname and
// my-objectname are dummy values, please replace them with original values.
// Requests are always secure (HTTPS) by default. Set insecure=true to enable insecure (HTTP) access.
// This boolean value is the last argument for New().
// New returns an Amazon S3 compatible client object. API copatibality (v2 or v4) is automatically
// determined based on the Endpoint value.
s3Client, err := minio.New("s3.amazonaws.com", "YOUR-ACCESSKEYID", "YOUR-SECRETACCESSKEY", false)
if err != nil {
log.Fatalln(err)
}
reader, err := s3Client.GetObject("my-bucketname", "my-objectname")
if err != nil {
log.Fatalln(err)
}
defer reader.Close()
objectInfo, err := reader.Stat()
if err != nil {
log.Fatalln(err)
}
// progress reader is notified as PutObject makes progress with
// the read. For partial resume put object, progress reader is
// appropriately advanced.
progress := pb.New64(objectInfo.Size)
progress.Start()
n, err := s3Client.PutObjectWithProgress("my-bucketname", "my-objectname-progress", reader, "application/octet-stream", progress)
if err != nil {
log.Fatalln(err)
}
log.Println("Uploaded", "my-objectname", " of size: ", n, "Successfully.")
}

View File

@ -0,0 +1,54 @@
/*
* Minio Go Library for Amazon S3 Compatible Cloud Storage (C) 2015 Minio, Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package minio
import "io"
// hookReader hooks additional reader in the source stream. It is
// useful for making progress bars. Second reader is appropriately
// notified about the exact number of bytes read from the primary
// source on each Read operation.
type hookReader struct {
source io.Reader
hook io.Reader
}
// Read implements io.Reader. Always reads from the source, the return
// value 'n' number of bytes are reported through the hook. Returns
// error for all non io.EOF conditions.
func (hr *hookReader) Read(b []byte) (n int, err error) {
n, err = hr.source.Read(b)
if err != nil && err != io.EOF {
return n, err
}
// Progress the hook with the total read bytes from the source.
if _, herr := hr.hook.Read(b[:n]); herr != nil {
if herr != io.EOF {
return n, herr
}
}
return n, err
}
// newHook returns a io.Reader which implements hookReader that
// reports the data read from the source to the hook.
func newHook(source, hook io.Reader) io.Reader {
if hook == nil {
return source
}
return &hookReader{source, hook}
}

View File

@ -24,6 +24,7 @@ import (
"fmt"
"net/http"
"net/url"
"path/filepath"
"sort"
"strconv"
"strings"
@ -38,8 +39,11 @@ const (
// Encode input URL path to URL encoded path.
func encodeURL2Path(u *url.URL) (path string) {
// Encode URL path.
if strings.HasSuffix(u.Host, ".s3.amazonaws.com") {
path = "/" + strings.TrimSuffix(u.Host, ".s3.amazonaws.com")
if isS3, _ := filepath.Match("*.s3*.amazonaws.com", u.Host); isS3 {
hostSplits := strings.SplitN(u.Host, ".", 4)
// First element is the bucket name.
bucketName := hostSplits[0]
path = "/" + bucketName
path += u.Path
path = urlEncodePath(path)
return
@ -54,9 +58,9 @@ func encodeURL2Path(u *url.URL) (path string) {
return
}
// PreSignV2 - presign the request in following style.
// preSignV2 - presign the request in following style.
// https://${S3_BUCKET}.s3.amazonaws.com/${S3_OBJECT}?AWSAccessKeyId=${S3_ACCESS_KEY}&Expires=${TIMESTAMP}&Signature=${SIGNATURE}.
func PreSignV2(req http.Request, accessKeyID, secretAccessKey string, expires int64) *http.Request {
func preSignV2(req http.Request, accessKeyID, secretAccessKey string, expires int64) *http.Request {
// Presign is not needed for anonymous credentials.
if accessKeyID == "" || secretAccessKey == "" {
return &req
@ -98,9 +102,9 @@ func PreSignV2(req http.Request, accessKeyID, secretAccessKey string, expires in
return &req
}
// PostPresignSignatureV2 - presigned signature for PostPolicy
// postPresignSignatureV2 - presigned signature for PostPolicy
// request.
func PostPresignSignatureV2(policyBase64, secretAccessKey string) string {
func postPresignSignatureV2(policyBase64, secretAccessKey string) string {
hm := hmac.New(sha1.New, []byte(secretAccessKey))
hm.Write([]byte(policyBase64))
signature := base64.StdEncoding.EncodeToString(hm.Sum(nil))
@ -123,8 +127,8 @@ func PostPresignSignatureV2(policyBase64, secretAccessKey string) string {
//
// CanonicalizedProtocolHeaders = <described below>
// SignV2 sign the request before Do() (AWS Signature Version 2).
func SignV2(req http.Request, accessKeyID, secretAccessKey string) *http.Request {
// signV2 sign the request before Do() (AWS Signature Version 2).
func signV2(req http.Request, accessKeyID, secretAccessKey string) *http.Request {
// Signature calculation is not needed for anonymous credentials.
if accessKeyID == "" || secretAccessKey == "" {
return &req
@ -251,10 +255,9 @@ var resourceList = []string{
// CanonicalizedResource = [ "/" + Bucket ] +
// <HTTP-Request-URI, from the protocol name up to the query string> +
// [ sub-resource, if present. For example "?acl", "?location", "?logging", or "?torrent"];
func writeCanonicalizedResource(buf *bytes.Buffer, req http.Request) error {
func writeCanonicalizedResource(buf *bytes.Buffer, req http.Request) {
// Save request URL.
requestURL := req.URL
// Get encoded URL path.
path := encodeURL2Path(requestURL)
buf.WriteString(path)
@ -285,5 +288,4 @@ func writeCanonicalizedResource(buf *bytes.Buffer, req http.Request) error {
}
}
}
return nil
}

View File

@ -202,9 +202,9 @@ func getStringToSignV4(t time.Time, location, canonicalRequest string) string {
return stringToSign
}
// PreSignV4 presign the request, in accordance with
// preSignV4 presign the request, in accordance with
// http://docs.aws.amazon.com/AmazonS3/latest/API/sigv4-query-string-auth.html.
func PreSignV4(req http.Request, accessKeyID, secretAccessKey, location string, expires int64) *http.Request {
func preSignV4(req http.Request, accessKeyID, secretAccessKey, location string, expires int64) *http.Request {
// Presign is not needed for anonymous credentials.
if accessKeyID == "" || secretAccessKey == "" {
return &req
@ -246,9 +246,9 @@ func PreSignV4(req http.Request, accessKeyID, secretAccessKey, location string,
return &req
}
// PostPresignSignatureV4 - presigned signature for PostPolicy
// postPresignSignatureV4 - presigned signature for PostPolicy
// requests.
func PostPresignSignatureV4(policyBase64 string, t time.Time, secretAccessKey, location string) string {
func postPresignSignatureV4(policyBase64 string, t time.Time, secretAccessKey, location string) string {
// Get signining key.
signingkey := getSigningKey(secretAccessKey, location, t)
// Calculate signature.
@ -256,9 +256,9 @@ func PostPresignSignatureV4(policyBase64 string, t time.Time, secretAccessKey, l
return signature
}
// SignV4 sign the request before Do(), in accordance with
// signV4 sign the request before Do(), in accordance with
// http://docs.aws.amazon.com/AmazonS3/latest/API/sig-v4-authenticating-requests.html.
func SignV4(req http.Request, accessKeyID, secretAccessKey, location string) *http.Request {
func signV4(req http.Request, accessKeyID, secretAccessKey, location string) *http.Request {
// Signature calculation is not needed for anonymous credentials.
if accessKeyID == "" || secretAccessKey == "" {
return &req

View File

@ -0,0 +1,40 @@
/*
* Minio Go Library for Amazon S3 Compatible Cloud Storage (C) 2015 Minio, Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package minio
// awsS3EndpointMap Amazon S3 endpoint map.
var awsS3EndpointMap = map[string]string{
"us-east-1": "s3.amazonaws.com",
"us-west-2": "s3-us-west-2.amazonaws.com",
"us-west-1": "s3-us-west-1.amazonaws.com",
"eu-west-1": "s3-eu-west-1.amazonaws.com",
"eu-central-1": "s3-eu-central-1.amazonaws.com",
"ap-southeast-1": "s3-ap-southeast-1.amazonaws.com",
"ap-northeast-1": "s3-ap-northeast-1.amazonaws.com",
"ap-northeast-2": "s3-ap-northeast-2.amazonaws.com",
"sa-east-1": "s3-sa-east-1.amazonaws.com",
}
// getS3Endpoint get Amazon S3 endpoint based on the bucket location.
func getS3Endpoint(bucketLocation string) (s3Endpoint string) {
s3Endpoint, ok := awsS3EndpointMap[bucketLocation]
if !ok {
// Default to 's3.amazonaws.com' endpoint.
s3Endpoint = "s3.amazonaws.com"
}
return s3Endpoint
}

View File

@ -52,15 +52,6 @@ func sumHMAC(key []byte, data []byte) []byte {
return hash.Sum(nil)
}
// isPartUploaded - true if part is already uploaded.
func isPartUploaded(objPart objectPart, objectParts map[int]objectPart) (isUploaded bool) {
_, isUploaded = objectParts[objPart.PartNumber]
if isUploaded {
isUploaded = (objPart.ETag == objectParts[objPart.PartNumber].ETag)
}
return
}
// getEndpointURL - construct a new endpoint.
func getEndpointURL(endpoint string, inSecure bool) (*url.URL, error) {
if strings.Contains(endpoint, ":") {
@ -151,9 +142,16 @@ func closeResponse(resp *http.Response) {
}
}
// isVirtualHostSupported - verify if host supports virtual hosted style.
// Currently only Amazon S3 and Google Cloud Storage would support this.
func isVirtualHostSupported(endpointURL *url.URL) bool {
// isVirtualHostSupported - verifies if bucketName can be part of
// virtual host. Currently only Amazon S3 and Google Cloud Storage would
// support this.
func isVirtualHostSupported(endpointURL *url.URL, bucketName string) bool {
// bucketName can be valid but '.' in the hostname will fail SSL
// certificate validation. So do not use host-style for such buckets.
if endpointURL.Scheme == "https" && strings.Contains(bucketName, ".") {
return false
}
// Return true for all other cases
return isAmazonEndpoint(endpointURL) || isGoogleEndpoint(endpointURL)
}
@ -212,13 +210,9 @@ func isValidExpiry(expires time.Duration) error {
return nil
}
/// Excerpts from - http://docs.aws.amazon.com/AmazonS3/latest/dev/BucketRestrictions.html
/// When using virtual hostedstyle buckets with SSL, the SSL wild card
/// certificate only matches buckets that do not contain periods.
/// To work around this, use HTTP or write your own certificate verification logic.
// We decided to not support bucketNames with '.' in them.
var validBucketName = regexp.MustCompile(`^[a-z0-9][a-z0-9\-]{1,61}[a-z0-9]$`)
// We support '.' with bucket names but we fallback to using path
// style requests instead for such buckets.
var validBucketName = regexp.MustCompile(`^[a-z0-9][a-z0-9\.\-]{1,61}[a-z0-9]$`)
// isValidBucketName - verify bucket name in accordance with
// - http://docs.aws.amazon.com/AmazonS3/latest/dev/UsingBucket.html
@ -235,6 +229,9 @@ func isValidBucketName(bucketName string) error {
if bucketName[0] == '.' || bucketName[len(bucketName)-1] == '.' {
return ErrInvalidBucketName("Bucket name cannot start or end with a '.' dot.")
}
if match, _ := regexp.MatchString("\\.\\.", bucketName); match == true {
return ErrInvalidBucketName("Bucket name cannot have successive periods.")
}
if !validBucketName.MatchString(bucketName) {
return ErrInvalidBucketName("Bucket name contains invalid characters.")
}
@ -267,39 +264,6 @@ func isValidObjectPrefix(objectPrefix string) error {
return nil
}
// optimalPartSize - calculate the optimal part size for the given objectSize.
//
// NOTE: Assumption here is that for any object to be uploaded to any S3 compatible
// object storage it will have the following parameters as constants.
//
// maxParts - 10000
// minimumPartSize - 5MiB
// maximumPartSize - 5GiB
//
// if the partSize after division with maxParts is greater than minimumPartSize
// then choose miniumPartSize as the new part size, if not return minimumPartSize.
//
// Special cases
//
// - if input object size is -1 then return maxPartSize.
// - if it happens to be that partSize is indeed bigger
// than the maximum part size just return maxPartSize.
func optimalPartSize(objectSize int64) int64 {
// if object size is -1 choose part size as 5GiB.
if objectSize == -1 {
return maxPartSize
}
// make sure last part has enough buffer and handle this poperly.
partSize := (objectSize / (maxParts - 1))
if partSize > minimumPartSize {
if partSize > maxPartSize {
return maxPartSize
}
return partSize
}
return minimumPartSize
}
// urlEncodePath encode the strings from UTF-8 byte representations to HTML hex escape sequences
//
// This is necessary since regular url.Parse() and url.Encode() functions do not support UTF-8