From 0237b0d9724eeeae6a86db189400f972cee4cd31 Mon Sep 17 00:00:00 2001 From: Alexander Neumann Date: Sat, 2 Jan 2016 14:37:08 +0100 Subject: [PATCH] Update s3 library again --- Godeps/Godeps.json | 4 +- .../src/github.com/minio/minio-go/.travis.yml | 2 +- .../minio/minio-go/api-definitions.go | 10 +- .../minio/minio-go/api-error-response.go | 10 + .../minio/minio-go/api-fput-object.go | 65 +++--- .../src/github.com/minio/minio-go/api-get.go | 10 +- .../src/github.com/minio/minio-go/api-list.go | 4 + .../minio/minio-go/api-put-object-partial.go | 199 +++++++++++++++++- .../minio/minio-go/api-put-object.go | 146 +++++++------ .../minio/minio-go/api-s3-definitions.go | 3 - .../src/github.com/minio/minio-go/api-stat.go | 7 +- .../minio/minio-go/api_functional_test.go | 174 ++++++++++++++- .../github.com/minio/minio-go/appveyor.yml | 2 +- .../src/github.com/minio/minio-go/tempfile.go | 16 -- 14 files changed, 505 insertions(+), 147 deletions(-) diff --git a/Godeps/Godeps.json b/Godeps/Godeps.json index c6e5457c1..97d89996a 100644 --- a/Godeps/Godeps.json +++ b/Godeps/Godeps.json @@ -24,8 +24,8 @@ }, { "ImportPath": "github.com/minio/minio-go", - "Comment": "v0.2.5-187-gad1597d", - "Rev": "ad1597d864f56f608f8a1694ae9b5970fef57eb6" + "Comment": "v0.2.5-195-gf30b6ca", + "Rev": "f30b6ca90bfda7578f6a11b7ba6af2eae7b0510c" }, { "ImportPath": "github.com/pkg/sftp", diff --git a/Godeps/_workspace/src/github.com/minio/minio-go/.travis.yml b/Godeps/_workspace/src/github.com/minio/minio-go/.travis.yml index 01078a5e7..c55421487 100644 --- a/Godeps/_workspace/src/github.com/minio/minio-go/.travis.yml +++ b/Godeps/_workspace/src/github.com/minio/minio-go/.travis.yml @@ -3,7 +3,7 @@ go: - 1.5.1 script: - go vet ./... -- go test -race -v ./... +- go test -test.short -race -v ./... notifications: slack: secure: HrOX2k6F/sEl6Rr4m5vHOdJCIwV42be0kz1Jy/WSMvrl/fQ8YkldKviLeWh4aWt1kclsYhNQ4FqGML+RIZYsdOqej4fAw9Vi5pZkI1MzPJq0UjrtMqkqzvD90eDGQYCKwaXjEIN8cohwJeb6X0B0HKAd9sqJW5GH5SwnhH5WWP8= diff --git a/Godeps/_workspace/src/github.com/minio/minio-go/api-definitions.go b/Godeps/_workspace/src/github.com/minio/minio-go/api-definitions.go index 7667645a1..123de1850 100644 --- a/Godeps/_workspace/src/github.com/minio/minio-go/api-definitions.go +++ b/Godeps/_workspace/src/github.com/minio/minio-go/api-definitions.go @@ -71,20 +71,20 @@ type ObjectMultipartStat struct { Err error } -// partMetadata - container for each partMetadata. -type partMetadata struct { +// partData - container for each part. +type partData struct { MD5Sum []byte Sha256Sum []byte ReadCloser io.ReadCloser Size int64 - Number int // partMetadata number. + Number int // partData number. // Error Err error } -// putObjectMetadata - container for each single PUT operation. -type putObjectMetadata struct { +// putObjectData - container for each single PUT operation. +type putObjectData struct { MD5Sum []byte Sha256Sum []byte ReadCloser io.ReadCloser diff --git a/Godeps/_workspace/src/github.com/minio/minio-go/api-error-response.go b/Godeps/_workspace/src/github.com/minio/minio-go/api-error-response.go index 0d2496507..4d7e30fc1 100644 --- a/Godeps/_workspace/src/github.com/minio/minio-go/api-error-response.go +++ b/Godeps/_workspace/src/github.com/minio/minio-go/api-error-response.go @@ -218,6 +218,16 @@ func ErrInvalidObjectName(message string) error { } } +// ErrInvalidParts - invalid number of parts. +func ErrInvalidParts(expectedParts, uploadedParts int) error { + msg := fmt.Sprintf("Unexpected number of parts found Want %d, Got %d", expectedParts, uploadedParts) + return ErrorResponse{ + Code: "InvalidParts", + Message: msg, + RequestID: "minio", + } +} + // ErrInvalidObjectPrefix - invalid object prefix response is // similar to object name response. var ErrInvalidObjectPrefix = ErrInvalidObjectName diff --git a/Godeps/_workspace/src/github.com/minio/minio-go/api-fput-object.go b/Godeps/_workspace/src/github.com/minio/minio-go/api-fput-object.go index 059710038..00b10aabb 100644 --- a/Godeps/_workspace/src/github.com/minio/minio-go/api-fput-object.go +++ b/Godeps/_workspace/src/github.com/minio/minio-go/api-fput-object.go @@ -93,7 +93,7 @@ func (c Client) FPutObject(bucketName, objectName, filePath, contentType string) // NOTE: Google Cloud Storage multipart Put is not compatible with Amazon S3 APIs. // Current implementation will only upload a maximum of 5GiB to Google Cloud Storage servers. if isGoogleEndpoint(c.endpointURL) { - if fileSize <= -1 || fileSize > int64(maxSinglePutObjectSize) { + if fileSize > int64(maxSinglePutObjectSize) { return 0, ErrorResponse{ Code: "NotImplemented", Message: fmt.Sprintf("Invalid Content-Length %d for file uploads to Google Cloud Storage.", fileSize), @@ -108,7 +108,7 @@ func (c Client) FPutObject(bucketName, objectName, filePath, contentType string) // NOTE: S3 doesn't allow anonymous multipart requests. if isAmazonEndpoint(c.endpointURL) && c.anonymous { - if fileSize <= -1 || fileSize > int64(maxSinglePutObjectSize) { + if fileSize > int64(maxSinglePutObjectSize) { return 0, ErrorResponse{ Code: "NotImplemented", Message: fmt.Sprintf("For anonymous requests Content-Length cannot be %d.", fileSize), @@ -121,14 +121,11 @@ func (c Client) FPutObject(bucketName, objectName, filePath, contentType string) return n, err } - // Large file upload is initiated for uploads for input data size - // if its greater than 5MiB or data size is negative. - if fileSize >= minimumPartSize || fileSize < 0 { - n, err := c.fputLargeObject(bucketName, objectName, fileData, fileSize, contentType) - return n, err + // Small object upload is initiated for uploads for input data size smaller than 5MiB. + if fileSize < minimumPartSize { + return c.putSmallObject(bucketName, objectName, fileData, fileSize, contentType) } - n, err := c.putSmallObject(bucketName, objectName, fileData, fileSize, contentType) - return n, err + return c.fputLargeObject(bucketName, objectName, fileData, fileSize, contentType) } // computeHash - calculates MD5 and Sha256 for an input read Seeker. @@ -192,7 +189,6 @@ func (c Client) fputLargeObject(bucketName, objectName string, fileData *os.File var prevMaxPartSize int64 // Loop through all parts and calculate totalUploadedSize. for _, partInfo := range partsInfo { - totalUploadedSize += partInfo.Size // Choose the maximum part size. if partInfo.Size >= prevMaxPartSize { prevMaxPartSize = partInfo.Size @@ -206,11 +202,14 @@ func (c Client) fputLargeObject(bucketName, objectName string, fileData *os.File partSize = prevMaxPartSize } - // Part number always starts with '1'. - partNumber := 1 + // Part number always starts with '0'. + partNumber := 0 // Loop through until EOF. for totalUploadedSize < fileSize { + // Increment part number. + partNumber++ + // Get a section reader on a particular offset. sectionReader := io.NewSectionReader(fileData, totalUploadedSize, partSize) @@ -221,7 +220,7 @@ func (c Client) fputLargeObject(bucketName, objectName string, fileData *os.File } // Save all the part metadata. - partMdata := partMetadata{ + prtData := partData{ ReadCloser: ioutil.NopCloser(sectionReader), Size: size, MD5Sum: md5Sum, @@ -229,31 +228,26 @@ func (c Client) fputLargeObject(bucketName, objectName string, fileData *os.File Number: partNumber, // Part number to be uploaded. } - // If part number already uploaded, move to the next one. - if isPartUploaded(objectPart{ - ETag: hex.EncodeToString(partMdata.MD5Sum), - PartNumber: partMdata.Number, + // If part not uploaded proceed to upload. + if !isPartUploaded(objectPart{ + ETag: hex.EncodeToString(prtData.MD5Sum), + PartNumber: prtData.Number, }, partsInfo) { - // Close the read closer. - partMdata.ReadCloser.Close() - continue + // Upload the part. + objPart, err := c.uploadPart(bucketName, objectName, uploadID, prtData) + if err != nil { + prtData.ReadCloser.Close() + return totalUploadedSize, err + } + // Save successfully uploaded part metadata. + partsInfo[prtData.Number] = objPart } - // Upload the part. - objPart, err := c.uploadPart(bucketName, objectName, uploadID, partMdata) - if err != nil { - partMdata.ReadCloser.Close() - return totalUploadedSize, err - } + // Close the read closer for temporary file. + prtData.ReadCloser.Close() // Save successfully uploaded size. - totalUploadedSize += partMdata.Size - - // Save successfully uploaded part metadata. - partsInfo[partMdata.Number] = objPart - - // Increment to next part number. - partNumber++ + totalUploadedSize += prtData.Size } // if totalUploadedSize is different than the file 'size'. Do not complete the request throw an error. @@ -269,6 +263,11 @@ func (c Client) fputLargeObject(bucketName, objectName string, fileData *os.File completeMultipartUpload.Parts = append(completeMultipartUpload.Parts, complPart) } + // If partNumber is different than total list of parts, error out. + if partNumber != len(completeMultipartUpload.Parts) { + return totalUploadedSize, ErrInvalidParts(partNumber, len(completeMultipartUpload.Parts)) + } + // Sort all completed parts. sort.Sort(completedParts(completeMultipartUpload.Parts)) _, err = c.completeMultipartUpload(bucketName, objectName, uploadID, completeMultipartUpload) diff --git a/Godeps/_workspace/src/github.com/minio/minio-go/api-get.go b/Godeps/_workspace/src/github.com/minio/minio-go/api-get.go index e35dcf930..7596278af 100644 --- a/Godeps/_workspace/src/github.com/minio/minio-go/api-get.go +++ b/Godeps/_workspace/src/github.com/minio/minio-go/api-get.go @@ -365,9 +365,12 @@ func (c Client) getObject(bucketName, objectName string, offset, length int64) ( return nil, ObjectStat{}, HTTPRespToErrorResponse(resp, bucketName, objectName) } } - // trim off the odd double quotes. - md5sum := strings.Trim(resp.Header.Get("ETag"), "\"") - // parse the date. + + // Trim off the odd double quotes from ETag in the beginning and end. + md5sum := strings.TrimPrefix(resp.Header.Get("ETag"), "\"") + md5sum = strings.TrimSuffix(md5sum, "\"") + + // Parse the date. date, err := time.Parse(http.TimeFormat, resp.Header.Get("Last-Modified")) if err != nil { msg := "Last-Modified time format not recognized. " + reportIssue @@ -379,6 +382,7 @@ func (c Client) getObject(bucketName, objectName string, offset, length int64) ( AmzBucketRegion: resp.Header.Get("x-amz-bucket-region"), } } + // Get content-type. contentType := strings.TrimSpace(resp.Header.Get("Content-Type")) if contentType == "" { contentType = "application/octet-stream" diff --git a/Godeps/_workspace/src/github.com/minio/minio-go/api-list.go b/Godeps/_workspace/src/github.com/minio/minio-go/api-list.go index 4de5da89d..8838900a8 100644 --- a/Godeps/_workspace/src/github.com/minio/minio-go/api-list.go +++ b/Godeps/_workspace/src/github.com/minio/minio-go/api-list.go @@ -20,6 +20,7 @@ import ( "fmt" "net/http" "net/url" + "strings" ) // ListBuckets list all buckets owned by this authenticated user. @@ -393,6 +394,9 @@ func (c Client) listObjectParts(bucketName, objectName, uploadID string) (partsI } // Append to parts info. for _, part := range listObjPartsResult.ObjectParts { + // Trim off the odd double quotes from ETag in the beginning and end. + part.ETag = strings.TrimPrefix(part.ETag, "\"") + part.ETag = strings.TrimSuffix(part.ETag, "\"") partsInfo[part.PartNumber] = part } // Keep part number marker, for the next iteration. diff --git a/Godeps/_workspace/src/github.com/minio/minio-go/api-put-object-partial.go b/Godeps/_workspace/src/github.com/minio/minio-go/api-put-object-partial.go index 3b7a5b733..8c05d8858 100644 --- a/Godeps/_workspace/src/github.com/minio/minio-go/api-put-object-partial.go +++ b/Godeps/_workspace/src/github.com/minio/minio-go/api-put-object-partial.go @@ -17,11 +17,14 @@ package minio import ( + "bytes" "crypto/md5" "crypto/sha256" "errors" + "fmt" "hash" "io" + "io/ioutil" "sort" ) @@ -34,9 +37,187 @@ func (c Client) PutObjectPartial(bucketName, objectName string, data ReadAtClose if err := isValidObjectName(objectName); err != nil { return 0, err } + // Input size negative should return error. + if size < 0 { + return 0, ErrInvalidArgument("Input file size cannot be negative.") + } + // Input size bigger than 5TiB should fail. + if size > int64(maxMultipartPutObjectSize) { + return 0, ErrInvalidArgument("Input file size is bigger than the supported maximum of 5TiB.") + } - // Cleanup any previously left stale files, as the function exits. - defer cleanupStaleTempfiles("multiparts$-putobject-partial") + // NOTE: Google Cloud Storage does not implement Amazon S3 Compatible multipart PUT. + // So we fall back to single PUT operation with the maximum limit of 5GiB. + if isGoogleEndpoint(c.endpointURL) { + if size > int64(maxSinglePutObjectSize) { + return 0, ErrorResponse{ + Code: "NotImplemented", + Message: fmt.Sprintf("Invalid Content-Length %d for file uploads to Google Cloud Storage.", size), + Key: objectName, + BucketName: bucketName, + } + } + // Do not compute MD5 for Google Cloud Storage. Uploads upto 5GiB in size. + n, err := c.putPartialNoChksum(bucketName, objectName, data, size, contentType) + return n, err + } + + // NOTE: S3 doesn't allow anonymous multipart requests. + if isAmazonEndpoint(c.endpointURL) && c.anonymous { + if size > int64(maxSinglePutObjectSize) { + return 0, ErrorResponse{ + Code: "NotImplemented", + Message: fmt.Sprintf("For anonymous requests Content-Length cannot be %d.", size), + Key: objectName, + BucketName: bucketName, + } + } + // Do not compute MD5 for anonymous requests to Amazon S3. Uploads upto 5GiB in size. + n, err := c.putPartialAnonymous(bucketName, objectName, data, size, contentType) + return n, err + } + + // Small file upload is initiated for uploads for input data size smaller than 5MiB. + if size < minimumPartSize { + n, err = c.putPartialSmallObject(bucketName, objectName, data, size, contentType) + return n, err + } + n, err = c.putPartialLargeObject(bucketName, objectName, data, size, contentType) + return n, err + +} + +// putNoChecksumPartial special function used Google Cloud Storage. This special function +// is used for Google Cloud Storage since Google's multipart API is not S3 compatible. +func (c Client) putPartialNoChksum(bucketName, objectName string, data ReadAtCloser, size int64, contentType string) (n int64, err error) { + // Input validation. + if err := isValidBucketName(bucketName); err != nil { + return 0, err + } + if err := isValidObjectName(objectName); err != nil { + return 0, err + } + if size > maxPartSize { + return 0, ErrEntityTooLarge(size, bucketName, objectName) + } + + // Create a new pipe to stage the reads. + reader, writer := io.Pipe() + + // readAtOffset to carry future offsets. + var readAtOffset int64 + + // readAt defaults to reading at 5MiB buffer. + readAtBuffer := make([]byte, 1024*1024*5) + + // Initiate a routine to start writing. + go func() { + for { + readAtSize, rerr := data.ReadAt(readAtBuffer, readAtOffset) + if rerr != nil { + if rerr != io.EOF { + writer.CloseWithError(rerr) + return + } + } + writeSize, werr := writer.Write(readAtBuffer[:readAtSize]) + if werr != nil { + writer.CloseWithError(werr) + return + } + if readAtSize != writeSize { + writer.CloseWithError(errors.New("Something really bad happened here. " + reportIssue)) + return + } + readAtOffset += int64(writeSize) + if rerr == io.EOF { + writer.Close() + return + } + } + }() + // For anonymous requests, we will not calculate sha256 and md5sum. + putObjData := putObjectData{ + MD5Sum: nil, + Sha256Sum: nil, + ReadCloser: reader, + Size: size, + ContentType: contentType, + } + // Execute put object. + st, err := c.putObject(bucketName, objectName, putObjData) + if err != nil { + return 0, err + } + if st.Size != size { + return 0, ErrUnexpectedEOF(st.Size, size, bucketName, objectName) + } + return size, nil +} + +// putAnonymousPartial is a special function for uploading content as anonymous request. +// This special function is necessary since Amazon S3 doesn't allow anonymous multipart uploads. +func (c Client) putPartialAnonymous(bucketName, objectName string, data ReadAtCloser, size int64, contentType string) (n int64, err error) { + // Input validation. + if err := isValidBucketName(bucketName); err != nil { + return 0, err + } + if err := isValidObjectName(objectName); err != nil { + return 0, err + } + return c.putPartialNoChksum(bucketName, objectName, data, size, contentType) +} + +// putSmallObjectPartial uploads files smaller than 5MiB. +func (c Client) putPartialSmallObject(bucketName, objectName string, data ReadAtCloser, size int64, contentType string) (n int64, err error) { + // Input validation. + if err := isValidBucketName(bucketName); err != nil { + return 0, err + } + if err := isValidObjectName(objectName); err != nil { + return 0, err + } + + // readAt defaults to reading at 5MiB buffer. + readAtBuffer := make([]byte, size) + readAtSize, err := data.ReadAt(readAtBuffer, 0) + if err != nil { + if err != io.EOF { + return 0, err + } + } + if int64(readAtSize) != size { + return 0, ErrUnexpectedEOF(int64(readAtSize), size, bucketName, objectName) + } + + // Construct a new PUT object metadata. + putObjData := putObjectData{ + MD5Sum: sumMD5(readAtBuffer), + Sha256Sum: sum256(readAtBuffer), + ReadCloser: ioutil.NopCloser(bytes.NewReader(readAtBuffer)), + Size: size, + ContentType: contentType, + } + // Single part use case, use putObject directly. + st, err := c.putObject(bucketName, objectName, putObjData) + if err != nil { + return 0, err + } + if st.Size != size { + return 0, ErrUnexpectedEOF(st.Size, size, bucketName, objectName) + } + return size, nil +} + +// putPartialLargeObject uploads files bigger than 5MiB. +func (c Client) putPartialLargeObject(bucketName, objectName string, data ReadAtCloser, size int64, contentType string) (n int64, err error) { + // Input validation. + if err := isValidBucketName(bucketName); err != nil { + return 0, err + } + if err := isValidObjectName(objectName); err != nil { + return 0, err + } // getUploadID for an object, initiates a new multipart request // if it cannot find any previously partially uploaded object. @@ -139,7 +320,7 @@ func (c Client) PutObjectPartial(bucketName, objectName string, data ReadAtClose } // Save all the part metadata. - partMdata := partMetadata{ + prtData := partData{ ReadCloser: tmpFile, MD5Sum: hashMD5.Sum(nil), Size: totalReadPartSize, @@ -147,25 +328,25 @@ func (c Client) PutObjectPartial(bucketName, objectName string, data ReadAtClose // Signature version '4'. if c.signature.isV4() { - partMdata.Sha256Sum = hashSha256.Sum(nil) + prtData.Sha256Sum = hashSha256.Sum(nil) } // Current part number to be uploaded. - partMdata.Number = partNumber + prtData.Number = partNumber // execute upload part. - objPart, err := c.uploadPart(bucketName, objectName, uploadID, partMdata) + objPart, err := c.uploadPart(bucketName, objectName, uploadID, prtData) if err != nil { // Close the read closer. - partMdata.ReadCloser.Close() + prtData.ReadCloser.Close() return totalUploadedSize, err } // Save successfully uploaded size. - totalUploadedSize += partMdata.Size + totalUploadedSize += prtData.Size // Save successfully uploaded part metadata. - partsInfo[partMdata.Number] = objPart + partsInfo[prtData.Number] = objPart // Move to next part. partNumber++ diff --git a/Godeps/_workspace/src/github.com/minio/minio-go/api-put-object.go b/Godeps/_workspace/src/github.com/minio/minio-go/api-put-object.go index 300ed4b40..563856bae 100644 --- a/Godeps/_workspace/src/github.com/minio/minio-go/api-put-object.go +++ b/Godeps/_workspace/src/github.com/minio/minio-go/api-put-object.go @@ -50,8 +50,8 @@ func (a completedParts) Less(i, j int) bool { return a[i].PartNumber < a[j].Part // - For size input as -1 PutObject does a multipart Put operation until input stream reaches EOF. // Maximum object size that can be uploaded through this operation will be 5TiB. // -// NOTE: Google Cloud Storage multipart Put is not compatible with Amazon S3 APIs. -// Current implementation will only upload a maximum of 5GiB to Google Cloud Storage servers. +// NOTE: Google Cloud Storage does not implement Amazon S3 Compatible multipart PUT. +// So we fall back to single PUT operation with the maximum limit of 5GiB. // // NOTE: For anonymous requests Amazon S3 doesn't allow multipart upload. So we fall back to single PUT operation. func (c Client) PutObject(bucketName, objectName string, data io.Reader, size int64, contentType string) (n int64, err error) { @@ -63,8 +63,8 @@ func (c Client) PutObject(bucketName, objectName string, data io.Reader, size in return 0, err } - // NOTE: Google Cloud Storage multipart Put is not compatible with Amazon S3 APIs. - // Current implementation will only upload a maximum of 5GiB to Google Cloud Storage servers. + // NOTE: Google Cloud Storage does not implement Amazon S3 Compatible multipart PUT. + // So we fall back to single PUT operation with the maximum limit of 5GiB. if isGoogleEndpoint(c.endpointURL) { if size <= -1 { return 0, ErrorResponse{ @@ -114,7 +114,7 @@ func (c Client) putNoChecksum(bucketName, objectName string, data io.Reader, siz return 0, ErrEntityTooLarge(size, bucketName, objectName) } // For anonymous requests, we will not calculate sha256 and md5sum. - putObjMetadata := putObjectMetadata{ + putObjData := putObjectData{ MD5Sum: nil, Sha256Sum: nil, ReadCloser: ioutil.NopCloser(data), @@ -122,9 +122,13 @@ func (c Client) putNoChecksum(bucketName, objectName string, data io.Reader, siz ContentType: contentType, } // Execute put object. - if _, err := c.putObject(bucketName, objectName, putObjMetadata); err != nil { + st, err := c.putObject(bucketName, objectName, putObjData) + if err != nil { return 0, err } + if st.Size != size { + return 0, ErrUnexpectedEOF(st.Size, size, bucketName, objectName) + } return size, nil } @@ -160,7 +164,7 @@ func (c Client) putSmallObject(bucketName, objectName string, data io.Reader, si return 0, ErrUnexpectedEOF(int64(len(dataBytes)), size, bucketName, objectName) } // Construct a new PUT object metadata. - putObjMetadata := putObjectMetadata{ + putObjData := putObjectData{ MD5Sum: sumMD5(dataBytes), Sha256Sum: sum256(dataBytes), ReadCloser: ioutil.NopCloser(bytes.NewReader(dataBytes)), @@ -168,9 +172,13 @@ func (c Client) putSmallObject(bucketName, objectName string, data io.Reader, si ContentType: contentType, } // Single part use case, use putObject directly. - if _, err := c.putObject(bucketName, objectName, putObjMetadata); err != nil { + st, err := c.putObject(bucketName, objectName, putObjData) + if err != nil { return 0, err } + if st.Size != size { + return 0, ErrUnexpectedEOF(st.Size, size, bucketName, objectName) + } return size, nil } @@ -189,12 +197,13 @@ func (c Client) hashCopy(writer io.ReadWriteSeeker, data io.Reader, partSize int // Copies to input at writer. size, err = io.CopyN(hashWriter, data, partSize) if err != nil { + // If not EOF return error right here. if err != io.EOF { return nil, nil, 0, err } } - // Seek back to beginning of input. + // Seek back to beginning of input, any error fail right here. if _, err := writer.Seek(0, 0); err != nil { return nil, nil, 0, err } @@ -204,7 +213,7 @@ func (c Client) hashCopy(writer io.ReadWriteSeeker, data io.Reader, partSize int if c.signature.isV4() { sha256Sum = hashSha256.Sum(nil) } - return md5Sum, sha256Sum, size, nil + return md5Sum, sha256Sum, size, err } // putLargeObject uploads files bigger than 5 mega bytes. @@ -217,9 +226,6 @@ func (c Client) putLargeObject(bucketName, objectName string, data io.Reader, si return 0, err } - // Cleanup any previously left stale files, as the function exits. - defer cleanupStaleTempfiles("multiparts$-putobject") - // getUploadID for an object, initiates a new multipart request // if it cannot find any previously partially uploaded object. uploadID, err := c.getUploadID(bucketName, objectName, contentType) @@ -242,7 +248,6 @@ func (c Client) putLargeObject(bucketName, objectName string, data io.Reader, si var prevMaxPartSize int64 // Loop through all parts and calculate totalUploadedSize. for _, partInfo := range partsInfo { - totalUploadedSize += partInfo.Size // Choose the maximum part size. if partInfo.Size >= prevMaxPartSize { prevMaxPartSize = partInfo.Size @@ -256,15 +261,13 @@ func (c Client) putLargeObject(bucketName, objectName string, data io.Reader, si partSize = prevMaxPartSize } - // Part number always starts with '1'. - partNumber := 1 + // Part number always starts with '0'. + partNumber := 0 // Loop through until EOF. for { - // We have reached EOF, break out. - if totalUploadedSize == size { - break - } + // Increment part number. + partNumber++ // Initialize a new temporary file. tmpFile, err := newTempFile("multiparts$-putobject") @@ -273,15 +276,15 @@ func (c Client) putLargeObject(bucketName, objectName string, data io.Reader, si } // Calculates MD5 and Sha256 sum while copying partSize bytes into tmpFile. - md5Sum, sha256Sum, size, err := c.hashCopy(tmpFile, data, partSize) - if err != nil { - if err != io.EOF { - return 0, err + md5Sum, sha256Sum, size, rErr := c.hashCopy(tmpFile, data, partSize) + if rErr != nil { + if rErr != io.EOF { + return 0, rErr } } // Save all the part metadata. - partMdata := partMetadata{ + prtData := partData{ ReadCloser: tmpFile, Size: size, MD5Sum: md5Sum, @@ -289,39 +292,28 @@ func (c Client) putLargeObject(bucketName, objectName string, data io.Reader, si Number: partNumber, // Current part number to be uploaded. } - // If part number already uploaded, move to the next one. - if isPartUploaded(objectPart{ - ETag: hex.EncodeToString(partMdata.MD5Sum), + // If part not uploaded proceed to upload. + if !isPartUploaded(objectPart{ + ETag: hex.EncodeToString(prtData.MD5Sum), PartNumber: partNumber, }, partsInfo) { - // Close the read closer. - partMdata.ReadCloser.Close() - continue + // execute upload part. + objPart, err := c.uploadPart(bucketName, objectName, uploadID, prtData) + if err != nil { + // Close the read closer. + prtData.ReadCloser.Close() + return 0, err + } + // Save successfully uploaded part metadata. + partsInfo[prtData.Number] = objPart } - // execute upload part. - objPart, err := c.uploadPart(bucketName, objectName, uploadID, partMdata) - if err != nil { - // Close the read closer. - partMdata.ReadCloser.Close() - return totalUploadedSize, err - } + // Close the read closer. + prtData.ReadCloser.Close() - // Save successfully uploaded size. - totalUploadedSize += partMdata.Size - - // Save successfully uploaded part metadata. - partsInfo[partMdata.Number] = objPart - - // Move to next part. - partNumber++ - } - - // If size is greater than zero verify totalWritten. - // if totalWritten is different than the input 'size', do not complete the request throw an error. - if size > 0 { - if totalUploadedSize != size { - return totalUploadedSize, ErrUnexpectedEOF(totalUploadedSize, size, bucketName, objectName) + // If read error was an EOF, break out of the loop. + if rErr == io.EOF { + break } } @@ -331,6 +323,21 @@ func (c Client) putLargeObject(bucketName, objectName string, data io.Reader, si complPart.ETag = part.ETag complPart.PartNumber = part.PartNumber completeMultipartUpload.Parts = append(completeMultipartUpload.Parts, complPart) + // Save successfully uploaded size. + totalUploadedSize += part.Size + } + + // If size is greater than zero verify totalUploadedSize. if totalUploadedSize is + // different than the input 'size', do not complete the request throw an error. + if size > 0 { + if totalUploadedSize != size { + return totalUploadedSize, ErrUnexpectedEOF(totalUploadedSize, size, bucketName, objectName) + } + } + + // If partNumber is different than total list of parts, error out. + if partNumber != len(completeMultipartUpload.Parts) { + return totalUploadedSize, ErrInvalidParts(partNumber, len(completeMultipartUpload.Parts)) } // Sort all completed parts. @@ -346,7 +353,7 @@ func (c Client) putLargeObject(bucketName, objectName string, data io.Reader, si // putObject - add an object to a bucket. // NOTE: You must have WRITE permissions on a bucket to add an object to it. -func (c Client) putObject(bucketName, objectName string, putObjMetadata putObjectMetadata) (ObjectStat, error) { +func (c Client) putObject(bucketName, objectName string, putObjData putObjectData) (ObjectStat, error) { // Input validation. if err := isValidBucketName(bucketName); err != nil { return ObjectStat{}, err @@ -355,23 +362,23 @@ func (c Client) putObject(bucketName, objectName string, putObjMetadata putObjec return ObjectStat{}, err } - if strings.TrimSpace(putObjMetadata.ContentType) == "" { - putObjMetadata.ContentType = "application/octet-stream" + if strings.TrimSpace(putObjData.ContentType) == "" { + putObjData.ContentType = "application/octet-stream" } // Set headers. customHeader := make(http.Header) - customHeader.Set("Content-Type", putObjMetadata.ContentType) + customHeader.Set("Content-Type", putObjData.ContentType) // Populate request metadata. reqMetadata := requestMetadata{ bucketName: bucketName, objectName: objectName, customHeader: customHeader, - contentBody: putObjMetadata.ReadCloser, - contentLength: putObjMetadata.Size, - contentSha256Bytes: putObjMetadata.Sha256Sum, - contentMD5Bytes: putObjMetadata.MD5Sum, + contentBody: putObjData.ReadCloser, + contentLength: putObjData.Size, + contentSha256Bytes: putObjData.Sha256Sum, + contentMD5Bytes: putObjData.MD5Sum, } // Initiate new request. req, err := c.newRequest("PUT", reqMetadata) @@ -389,11 +396,15 @@ func (c Client) putObject(bucketName, objectName string, putObjMetadata putObjec return ObjectStat{}, HTTPRespToErrorResponse(resp, bucketName, objectName) } } + var metadata ObjectStat - // Trim off the odd double quotes from ETag. - metadata.ETag = strings.Trim(resp.Header.Get("ETag"), "\"") + // Trim off the odd double quotes from ETag in the beginning and end. + metadata.ETag = strings.TrimPrefix(resp.Header.Get("ETag"), "\"") + metadata.ETag = strings.TrimSuffix(metadata.ETag, "\"") // A success here means data was written to server successfully. - metadata.Size = putObjMetadata.Size + metadata.Size = putObjData.Size + + // Return here. return metadata, nil } @@ -452,7 +463,7 @@ func (c Client) initiateMultipartUpload(bucketName, objectName, contentType stri } // uploadPart uploads a part in a multipart upload. -func (c Client) uploadPart(bucketName, objectName, uploadID string, uploadingPart partMetadata) (objectPart, error) { +func (c Client) uploadPart(bucketName, objectName, uploadID string, uploadingPart partData) (objectPart, error) { // Input validation. if err := isValidBucketName(bucketName); err != nil { return objectPart{}, err @@ -496,8 +507,11 @@ func (c Client) uploadPart(bucketName, objectName, uploadID string, uploadingPar } // Once successfully uploaded, return completed part. objPart := objectPart{} + objPart.Size = uploadingPart.Size objPart.PartNumber = uploadingPart.Number - objPart.ETag = resp.Header.Get("ETag") + // Trim off the odd double quotes from ETag in the beginning and end. + objPart.ETag = strings.TrimPrefix(resp.Header.Get("ETag"), "\"") + objPart.ETag = strings.TrimSuffix(objPart.ETag, "\"") return objPart, nil } diff --git a/Godeps/_workspace/src/github.com/minio/minio-go/api-s3-definitions.go b/Godeps/_workspace/src/github.com/minio/minio-go/api-s3-definitions.go index 16d87a70e..61931b0b3 100644 --- a/Godeps/_workspace/src/github.com/minio/minio-go/api-s3-definitions.go +++ b/Godeps/_workspace/src/github.com/minio/minio-go/api-s3-definitions.go @@ -103,9 +103,6 @@ type objectPart struct { // Size of the uploaded part data. Size int64 - - // Error - Err error } // listObjectPartsResult container for ListObjectParts response. diff --git a/Godeps/_workspace/src/github.com/minio/minio-go/api-stat.go b/Godeps/_workspace/src/github.com/minio/minio-go/api-stat.go index 9c5e96cf3..8a29bccd5 100644 --- a/Godeps/_workspace/src/github.com/minio/minio-go/api-stat.go +++ b/Godeps/_workspace/src/github.com/minio/minio-go/api-stat.go @@ -73,7 +73,12 @@ func (c Client) StatObject(bucketName, objectName string) (ObjectStat, error) { return ObjectStat{}, HTTPRespToErrorResponse(resp, bucketName, objectName) } } - md5sum := strings.Trim(resp.Header.Get("ETag"), "\"") // trim off the odd double quotes + + // Trim off the odd double quotes from ETag in the beginning and end. + md5sum := strings.TrimPrefix(resp.Header.Get("ETag"), "\"") + md5sum = strings.TrimSuffix(md5sum, "\"") + + // Parse content length. size, err := strconv.ParseInt(resp.Header.Get("Content-Length"), 10, 64) if err != nil { return ObjectStat{}, ErrorResponse{ diff --git a/Godeps/_workspace/src/github.com/minio/minio-go/api_functional_test.go b/Godeps/_workspace/src/github.com/minio/minio-go/api_functional_test.go index 886959de3..f7bd81097 100644 --- a/Godeps/_workspace/src/github.com/minio/minio-go/api_functional_test.go +++ b/Godeps/_workspace/src/github.com/minio/minio-go/api_functional_test.go @@ -54,6 +54,142 @@ func randString(n int, src rand.Source) string { return string(b[0:30]) } +func TestResumableFPutObject(t *testing.T) { + if testing.Short() { + t.Skip("skipping resumable tests with short runs") + } + + // Seed random based on current time. + rand.Seed(time.Now().Unix()) + + // Connect and make sure bucket exists. + c, err := minio.New( + "play.minio.io:9002", + "Q3AM3UQ867SPQQA43P2F", + "zuf+tfteSlswRu7BJ86wekitnifILbZam1KYY3TG", + false, + ) + if err != nil { + t.Fatal("Error:", err) + } + + // Set user agent. + c.SetAppInfo("Minio-go-FunctionalTest", "0.1.0") + + // Enable tracing, write to stdout. + // c.TraceOn(nil) + + // Generate a new random bucket name. + bucketName := randString(60, rand.NewSource(time.Now().UnixNano())) + + // make a new bucket. + err = c.MakeBucket(bucketName, "private", "us-east-1") + if err != nil { + t.Fatal("Error:", err, bucketName) + } + + file, err := ioutil.TempFile(os.TempDir(), "resumable") + if err != nil { + t.Fatal("Error:", err) + } + + n, _ := io.CopyN(file, crand.Reader, 11*1024*1024) + if n != int64(11*1024*1024) { + t.Fatalf("Error: number of bytes does not match, want %v, got %v\n", 11*1024*1024, n) + } + + objectName := bucketName + "-resumable" + + n, err = c.FPutObject(bucketName, objectName, file.Name(), "application/octet-stream") + if err != nil { + t.Fatal("Error:", err) + } + if n != int64(11*1024*1024) { + t.Fatalf("Error: number of bytes does not match, want %v, got %v\n", 11*1024*1024, n) + } + + // Close the file pro-actively for windows. + file.Close() + + err = c.RemoveObject(bucketName, objectName) + if err != nil { + t.Fatal("Error: ", err) + } + + err = c.RemoveBucket(bucketName) + if err != nil { + t.Fatal("Error:", err) + } + + err = os.Remove(file.Name()) + if err != nil { + t.Fatal("Error:", err) + } +} + +func TestResumablePutObject(t *testing.T) { + if testing.Short() { + t.Skip("skipping resumable tests with short runs") + } + + // Seed random based on current time. + rand.Seed(time.Now().Unix()) + + // Connect and make sure bucket exists. + c, err := minio.New( + "play.minio.io:9002", + "Q3AM3UQ867SPQQA43P2F", + "zuf+tfteSlswRu7BJ86wekitnifILbZam1KYY3TG", + false, + ) + if err != nil { + t.Fatal("Error:", err) + } + + // Set user agent. + c.SetAppInfo("Minio-go-FunctionalTest", "0.1.0") + + // Enable tracing, write to stdout. + // c.TraceOn(nil) + + // Generate a new random bucket name. + bucketName := randString(60, rand.NewSource(time.Now().UnixNano())) + + // make a new bucket. + err = c.MakeBucket(bucketName, "private", "us-east-1") + if err != nil { + t.Fatal("Error:", err, bucketName) + } + + // generate 11MB + buf := make([]byte, 11*1024*1024) + + _, err = io.ReadFull(crand.Reader, buf) + if err != nil { + t.Fatal("Error:", err) + } + + objectName := bucketName + "-resumable" + reader := bytes.NewReader(buf) + n, err := c.PutObject(bucketName, objectName, reader, int64(reader.Len()), "application/octet-stream") + if err != nil { + t.Fatal("Error:", err, bucketName, objectName) + } + if n != int64(len(buf)) { + t.Fatalf("Error: number of bytes does not match, want %v, got %v\n", len(buf), n) + } + + err = c.RemoveObject(bucketName, objectName) + if err != nil { + t.Fatal("Error: ", err) + } + + err = c.RemoveBucket(bucketName) + if err != nil { + t.Fatal("Error:", err) + } +} + func TestGetObjectPartialFunctional(t *testing.T) { // Seed random based on current time. rand.Seed(time.Now().Unix()) @@ -177,6 +313,14 @@ func TestGetObjectPartialFunctional(t *testing.T) { t.Fatal("Error:", err, len(buf6)) } } + err = c.RemoveObject(bucketName, objectName) + if err != nil { + t.Fatal("Error: ", err) + } + err = c.RemoveBucket(bucketName) + if err != nil { + t.Fatal("Error:", err) + } } func TestFunctional(t *testing.T) { @@ -271,14 +415,26 @@ func TestFunctional(t *testing.T) { // generate data buf := make([]byte, rand.Intn(1<<19)) - reader := bytes.NewReader(buf) + _, err = io.ReadFull(crand.Reader, buf) + if err != nil { + t.Fatal("Error: ", err) + } - n, err := c.PutObject(bucketName, objectName, reader, int64(reader.Len()), "") + n, err := c.PutObject(bucketName, objectName, bytes.NewReader(buf), int64(len(buf)), "") if err != nil { t.Fatal("Error: ", err) } if n != int64(len(buf)) { - t.Fatal("Error: bad length ", n, reader.Len()) + t.Fatal("Error: bad length ", n, len(buf)) + } + + n, err = c.PutObject(bucketName, objectName+"-nolength", bytes.NewReader(buf), -1, "binary/octet-stream") + if err != nil { + t.Fatal("Error:", err, bucketName, objectName+"-nolength") + } + + if n != int64(len(buf)) { + t.Fatalf("Error: number of bytes does not match, want %v, got %v\n", len(buf), n) } newReader, _, err := c.GetObject(bucketName, objectName) @@ -333,6 +489,10 @@ func TestFunctional(t *testing.T) { t.Fatal("Error: ", err) } buf = make([]byte, rand.Intn(1<<20)) + _, err = io.ReadFull(crand.Reader, buf) + if err != nil { + t.Fatal("Error: ", err) + } req, err := http.NewRequest("PUT", presignedPutURL, bytes.NewReader(buf)) if err != nil { t.Fatal("Error: ", err) @@ -365,25 +525,25 @@ func TestFunctional(t *testing.T) { if err != nil { t.Fatal("Error: ", err) } + err = c.RemoveObject(bucketName, objectName+"-nolength") + if err != nil { + t.Fatal("Error: ", err) + } err = c.RemoveObject(bucketName, objectName+"-presigned") if err != nil { t.Fatal("Error: ", err) } - err = c.RemoveBucket(bucketName) if err != nil { t.Fatal("Error:", err) } - err = c.RemoveBucket("bucket1") if err == nil { t.Fatal("Error:") } - if err.Error() != "The specified bucket does not exist." { t.Fatal("Error: ", err) } - if err = os.Remove(fileName); err != nil { t.Fatal("Error: ", err) } diff --git a/Godeps/_workspace/src/github.com/minio/minio-go/appveyor.yml b/Godeps/_workspace/src/github.com/minio/minio-go/appveyor.yml index 963698a04..444696bc5 100644 --- a/Godeps/_workspace/src/github.com/minio/minio-go/appveyor.yml +++ b/Godeps/_workspace/src/github.com/minio/minio-go/appveyor.yml @@ -27,7 +27,7 @@ build_script: - golint github.com/minio/minio-go... - deadcode - go test - - go test -race + - go test -test.short -race # to disable automatic tests test: off diff --git a/Godeps/_workspace/src/github.com/minio/minio-go/tempfile.go b/Godeps/_workspace/src/github.com/minio/minio-go/tempfile.go index 34508569f..e9fada3e6 100644 --- a/Godeps/_workspace/src/github.com/minio/minio-go/tempfile.go +++ b/Godeps/_workspace/src/github.com/minio/minio-go/tempfile.go @@ -19,7 +19,6 @@ package minio import ( "io/ioutil" "os" - "path/filepath" "sync" ) @@ -42,21 +41,6 @@ func newTempFile(prefix string) (*tempFile, error) { }, nil } -// cleanupStaleTempFiles - cleanup any stale files present in temp directory at a prefix. -func cleanupStaleTempfiles(prefix string) error { - globPath := filepath.Join(os.TempDir(), prefix) + "*" - staleFiles, err := filepath.Glob(globPath) - if err != nil { - return err - } - for _, staleFile := range staleFiles { - if err := os.Remove(staleFile); err != nil { - return err - } - } - return nil -} - // Close - closer wrapper to close and remove temporary file. func (t *tempFile) Close() error { t.mutex.Lock()