update s3 library for bugfix

This commit is contained in:
Alexander Neumann 2015-12-30 12:19:19 +01:00
parent d79c85af62
commit a73c4bd5a7
11 changed files with 386 additions and 59 deletions

4
Godeps/Godeps.json generated
View File

@ -24,8 +24,8 @@
},
{
"ImportPath": "github.com/minio/minio-go",
"Comment": "v0.2.5-177-g691a38d",
"Rev": "691a38d161d6dfc0e8e78dc5360bc39f48a8626d"
"Comment": "v0.2.5-185-g654a97a",
"Rev": "654a97a4d165dabf422bec2ef6673bcd9d3daf00"
},
{
"ImportPath": "github.com/pkg/sftp",

View File

@ -17,6 +17,7 @@
package minio
import (
"bytes"
"errors"
"fmt"
"io"
@ -55,7 +56,7 @@ func (c Client) GetBucketACL(bucketName string) (BucketACL, error) {
}
// Initiate the request.
resp, err := c.httpClient.Do(req)
resp, err := c.do(req)
defer closeResponse(resp)
if err != nil {
return "", err
@ -185,7 +186,7 @@ func (c Client) GetObjectPartial(bucketName, objectName string) (ReadAtCloser, O
// Get shortest length.
// NOTE: Last remaining bytes are usually smaller than
// req.Buffer size. Use that as the final length.
length := math.Min(float64(len(req.Buffer)), float64(objectStat.Size-req.Offset))
length := math.Min(float64(req.Buffer.Len()), float64(objectStat.Size-req.Offset))
httpReader, _, err := c.getObject(bucketName, objectName, req.Offset, int64(length))
if err != nil {
resCh <- readAtResponse{
@ -193,9 +194,9 @@ func (c Client) GetObjectPartial(bucketName, objectName string) (ReadAtCloser, O
}
return
}
size, err := httpReader.Read(req.Buffer)
size, err := io.CopyN(req.Buffer, httpReader, int64(length))
resCh <- readAtResponse{
Size: size,
Size: int(size),
Error: err,
}
}
@ -213,8 +214,8 @@ type readAtResponse struct {
// request message container to communicate with internal go-routine.
type readAtRequest struct {
Buffer []byte // requested bytes.
Offset int64 // readAt offset.
Buffer *bytes.Buffer
Offset int64 // readAt offset.
}
// objectReadAtCloser container for io.ReadAtCloser.
@ -247,11 +248,16 @@ func newObjectReadAtCloser(reqCh chan<- readAtRequest, resCh <-chan readAtRespon
// It returns the number of bytes read and the error, if any.
// ReadAt always returns a non-nil error when n < len(b).
// At end of file, that error is io.EOF.
func (r *objectReadAtCloser) ReadAt(p []byte, offset int64) (int, error) {
func (r *objectReadAtCloser) ReadAt(b []byte, offset int64) (int, error) {
// Locking.
r.mutex.Lock()
defer r.mutex.Unlock()
// if offset is negative and offset is greater than or equal to object size we return EOF.
if offset < 0 || offset >= r.objectSize {
return 0, io.EOF
}
// prevErr is which was saved in previous operation.
if r.prevErr != nil {
return 0, r.prevErr
@ -261,7 +267,7 @@ func (r *objectReadAtCloser) ReadAt(p []byte, offset int64) (int, error) {
reqMsg := readAtRequest{}
// Send the current offset and bytes requested.
reqMsg.Buffer = p
reqMsg.Buffer = bytes.NewBuffer(b)
reqMsg.Offset = offset
// Send read request over the control channel.
@ -270,15 +276,21 @@ func (r *objectReadAtCloser) ReadAt(p []byte, offset int64) (int, error) {
// Get data over the response channel.
dataMsg := <-r.resCh
// Bytes read.
bytesRead := int64(dataMsg.Size)
if dataMsg.Error == nil {
// If offset+bytes read is equal to objectSize
// we have reached end of file, we return io.EOF.
if offset+bytesRead == r.objectSize {
return dataMsg.Size, io.EOF
}
return dataMsg.Size, nil
}
// Save any error.
r.prevErr = dataMsg.Error
if dataMsg.Error != nil {
if dataMsg.Error == io.EOF {
return dataMsg.Size, dataMsg.Error
}
return 0, dataMsg.Error
}
return dataMsg.Size, nil
return dataMsg.Size, dataMsg.Error
}
// Closer is the interface that wraps the basic Close method.
@ -340,7 +352,7 @@ func (c Client) getObject(bucketName, objectName string, offset, length int64) (
return nil, ObjectStat{}, err
}
// Execute the request.
resp, err := c.httpClient.Do(req)
resp, err := c.do(req)
if err != nil {
return nil, ObjectStat{}, err
}

View File

@ -39,7 +39,7 @@ func (c Client) ListBuckets() ([]BucketStat, error) {
return nil, err
}
// Initiate the request.
resp, err := c.httpClient.Do(req)
resp, err := c.do(req)
defer closeResponse(resp)
if err != nil {
return nil, err
@ -197,7 +197,7 @@ func (c Client) listObjectsQuery(bucketName, objectPrefix, objectMarker, delimit
return listBucketResult{}, err
}
// Execute list buckets.
resp, err := c.httpClient.Do(req)
resp, err := c.do(req)
defer closeResponse(resp)
if err != nil {
return listBucketResult{}, err
@ -361,7 +361,7 @@ func (c Client) listMultipartUploadsQuery(bucketName, keyMarker, uploadIDMarker,
return listMultipartUploadsResult{}, err
}
// Execute list multipart uploads request.
resp, err := c.httpClient.Do(req)
resp, err := c.do(req)
defer closeResponse(resp)
if err != nil {
return listMultipartUploadsResult{}, err
@ -466,7 +466,7 @@ func (c Client) listObjectPartsQuery(bucketName, objectName, uploadID string, pa
return listObjectPartsResult{}, err
}
// Exectue list object parts.
resp, err := c.httpClient.Do(req)
resp, err := c.do(req)
defer closeResponse(resp)
if err != nil {
return listObjectPartsResult{}, err

View File

@ -67,7 +67,7 @@ func (c Client) MakeBucket(bucketName string, acl BucketACL, location string) er
}
// Execute the request.
resp, err := c.httpClient.Do(req)
resp, err := c.do(req)
defer closeResponse(resp)
if err != nil {
return err
@ -201,7 +201,7 @@ func (c Client) SetBucketACL(bucketName string, acl BucketACL) error {
}
// Initiate the request.
resp, err := c.httpClient.Do(req)
resp, err := c.do(req)
defer closeResponse(resp)
if err != nil {
return err

View File

@ -379,7 +379,7 @@ func (c Client) putObject(bucketName, objectName string, putObjMetadata putObjec
return ObjectStat{}, err
}
// Execute the request.
resp, err := c.httpClient.Do(req)
resp, err := c.do(req)
defer closeResponse(resp)
if err != nil {
return ObjectStat{}, err
@ -432,7 +432,7 @@ func (c Client) initiateMultipartUpload(bucketName, objectName, contentType stri
return initiateMultipartUploadResult{}, err
}
// Execute the request.
resp, err := c.httpClient.Do(req)
resp, err := c.do(req)
defer closeResponse(resp)
if err != nil {
return initiateMultipartUploadResult{}, err
@ -484,7 +484,7 @@ func (c Client) uploadPart(bucketName, objectName, uploadID string, uploadingPar
return objectPart{}, err
}
// Execute the request.
resp, err := c.httpClient.Do(req)
resp, err := c.do(req)
defer closeResponse(resp)
if err != nil {
return objectPart{}, err
@ -539,7 +539,7 @@ func (c Client) completeMultipartUpload(bucketName, objectName, uploadID string,
}
// Execute the request.
resp, err := c.httpClient.Do(req)
resp, err := c.do(req)
defer closeResponse(resp)
if err != nil {
return completeMultipartUploadResult{}, err

View File

@ -35,7 +35,7 @@ func (c Client) RemoveBucket(bucketName string) error {
if err != nil {
return err
}
resp, err := c.httpClient.Do(req)
resp, err := c.do(req)
defer closeResponse(resp)
if err != nil {
return err
@ -67,7 +67,7 @@ func (c Client) RemoveObject(bucketName, objectName string) error {
if err != nil {
return err
}
resp, err := c.httpClient.Do(req)
resp, err := c.do(req)
defer closeResponse(resp)
if err != nil {
return err
@ -137,8 +137,9 @@ func (c Client) abortMultipartUpload(bucketName, objectName, uploadID string) er
if err != nil {
return err
}
// execute the request.
resp, err := c.httpClient.Do(req)
resp, err := c.do(req)
defer closeResponse(resp)
if err != nil {
return err

View File

@ -34,7 +34,7 @@ func (c Client) BucketExists(bucketName string) error {
if err != nil {
return err
}
resp, err := c.httpClient.Do(req)
resp, err := c.do(req)
defer closeResponse(resp)
if err != nil {
return err
@ -63,7 +63,7 @@ func (c Client) StatObject(bucketName, objectName string) (ObjectStat, error) {
if err != nil {
return ObjectStat{}, err
}
resp, err := c.httpClient.Do(req)
resp, err := c.do(req)
defer closeResponse(resp)
if err != nil {
return ObjectStat{}, err

View File

@ -19,10 +19,14 @@ package minio
import (
"encoding/base64"
"encoding/hex"
"fmt"
"io"
"net/http"
"net/http/httputil"
"net/url"
"os"
"runtime"
"strings"
"time"
)
@ -44,6 +48,10 @@ type Client struct {
// Needs allocation.
httpClient *http.Client
bucketLocCache *bucketLocationCache
// Advanced functionality
isTraceEnabled bool
traceOutput io.Writer
}
// Global constants.
@ -159,6 +167,26 @@ func (c *Client) SetCustomTransport(customHTTPTransport http.RoundTripper) {
}
}
// TraceOn - enable HTTP tracing.
func (c *Client) TraceOn(outputStream io.Writer) error {
// if outputStream is nil then default to os.Stdout.
if outputStream == nil {
outputStream = os.Stdout
}
// Sets a new output stream.
c.traceOutput = outputStream
// Enable tracing.
c.isTraceEnabled = true
return nil
}
// TraceOff - disable HTTP tracing.
func (c *Client) TraceOff() {
// Disable tracing.
c.isTraceEnabled = false
}
// requestMetadata - is container for all the values to make a request.
type requestMetadata struct {
// If set newRequest presigns the URL.
@ -178,6 +206,66 @@ type requestMetadata struct {
contentMD5Bytes []byte
}
// dumpHTTP - dump HTTP request and response.
func (c Client) dumpHTTP(req *http.Request, resp *http.Response) error {
// Starts http dump.
_, err := fmt.Fprintln(c.traceOutput, "---------START-HTTP---------")
if err != nil {
return err
}
// Only display request header.
reqTrace, err := httputil.DumpRequestOut(req, false)
if err != nil {
return err
}
// Write request to trace output.
_, err = fmt.Fprint(c.traceOutput, string(reqTrace))
if err != nil {
return err
}
// Only display response header.
respTrace, err := httputil.DumpResponse(resp, false)
if err != nil {
return err
}
// Write response to trace output.
_, err = fmt.Fprint(c.traceOutput, strings.TrimSuffix(string(respTrace), "\r\n"))
if err != nil {
return err
}
// Ends the http dump.
_, err = fmt.Fprintln(c.traceOutput, "---------END-HTTP---------")
if err != nil {
return err
}
// Returns success.
return nil
}
// do - execute http request.
func (c Client) do(req *http.Request) (*http.Response, error) {
// execute the request.
resp, err := c.httpClient.Do(req)
if err != nil {
return resp, err
}
// If trace is enabled, dump http request and response.
if c.isTraceEnabled {
err = c.dumpHTTP(req, resp)
if err != nil {
return nil, err
}
}
return resp, nil
}
// newRequest - instantiate a new HTTP request for a given method.
func (c Client) newRequest(method string, metadata requestMetadata) (*http.Request, error) {
// If no method is supplied default to 'POST'.
if method == "" {
@ -344,4 +432,8 @@ type CloudStorageClient interface {
// Set custom transport.
SetCustomTransport(customTransport http.RoundTripper)
// HTTP tracing methods.
TraceOn(traceOutput io.Writer) error
TraceOff()
}

View File

@ -1,9 +1,27 @@
/*
* Minio Go Library for Amazon S3 Compatible Cloud Storage (C) 2015 Minio, Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package minio_test
import (
"bytes"
"io"
"io/ioutil"
"math/rand"
"net/http"
"os"
"testing"
"time"
@ -35,7 +53,11 @@ func randString(n int, src rand.Source) string {
return string(b[0:30])
}
func TestFunctional(t *testing.T) {
func TestGetObjectPartialFunctional(t *testing.T) {
// Seed random based on current time.
rand.Seed(time.Now().Unix())
// Connect and make sure bucket exists.
c, err := minio.New(
"play.minio.io:9002",
"Q3AM3UQ867SPQQA43P2F",
@ -47,55 +69,204 @@ func TestFunctional(t *testing.T) {
}
// Set user agent.
c.SetAppInfo("Test", "0.1.0")
c.SetAppInfo("Minio-go-FunctionalTest", "0.1.0")
// Enable tracing, write to stdout.
// c.TraceOn(nil)
// Generate a new random bucket name.
bucketName := randString(60, rand.NewSource(time.Now().UnixNano()))
// make a new bucket.
err = c.MakeBucket(bucketName, "private", "us-east-1")
if err != nil {
t.Fatal("Error:", err, bucketName)
}
// generate data
buf := make([]byte, rand.Intn(1<<20))
// save the data
objectName := randString(60, rand.NewSource(time.Now().UnixNano()))
n, err := c.PutObject(bucketName, objectName, bytes.NewReader(buf), int64(len(buf)), "binary/octet-stream")
if err != nil {
t.Fatal("Error:", err, bucketName, objectName)
}
if n != int64(len(buf)) {
t.Fatalf("Error: number of bytes does not match, want %v, got %v\n", len(buf), n)
}
// read the data back
r, st, err := c.GetObjectPartial(bucketName, objectName)
if err != nil {
t.Fatal("Error:", err, bucketName, objectName)
}
if st.Size != int64(len(buf)) {
t.Fatalf("Error: number of bytes in stat does not match, want %v, got %v\n",
len(buf), st.Size)
}
offset := int64(2048)
// read directly
buf2 := make([]byte, 512)
buf3 := make([]byte, 512)
buf4 := make([]byte, 512)
m, err := r.ReadAt(buf2, offset)
if err != nil {
t.Fatal("Error:", err, st.Size, len(buf2), offset)
}
if m != len(buf2) {
t.Fatalf("Error: ReadAt read shorter bytes before reaching EOF, want %v, got %v\n", m, len(buf2))
}
m, err = r.ReadAt(buf3, offset)
if err != nil {
t.Fatal("Error:", err, st.Size, len(buf3), offset)
}
if m != len(buf3) {
t.Fatalf("Error: ReadAt read shorter bytes before reaching EOF, want %v, got %v\n", m, len(buf3))
}
if !bytes.Equal(buf2, buf3) {
t.Fatal("Error: Incorrect read between two ReadAt from same offset.")
}
m, err = r.ReadAt(buf4, offset)
if err != nil {
t.Fatal("Error:", err, st.Size, len(buf4), offset)
}
if m != len(buf4) {
t.Fatalf("Error: ReadAt read shorter bytes before reaching EOF, want %v, got %v\n", m, len(buf4))
}
if !bytes.Equal(buf2, buf4) {
t.Fatal("Error: Incorrect read between two ReadAt from same offset.")
}
buf5 := make([]byte, n)
// Read the whole object.
m, err = r.ReadAt(buf5, 0)
if err != nil {
if err != io.EOF {
t.Fatal("Error:", err, len(buf5))
}
}
if m != len(buf5) {
t.Fatalf("Error: ReadAt read shorter bytes before reaching EOF, want %v, got %v\n", m, len(buf5))
}
if !bytes.Equal(buf, buf5) {
t.Fatal("Error: Incorrect data read in GetObject, than what was previously upoaded.")
}
buf6 := make([]byte, n+1)
// Read the whole object and beyond.
_, err = r.ReadAt(buf6, 0)
if err != nil {
if err != io.EOF {
t.Fatal("Error:", err, len(buf6))
}
}
}
func TestFunctional(t *testing.T) {
// Seed random based on current time.
rand.Seed(time.Now().Unix())
c, err := minio.New(
"play.minio.io:9002",
"Q3AM3UQ867SPQQA43P2F",
"zuf+tfteSlswRu7BJ86wekitnifILbZam1KYY3TG",
false,
)
if err != nil {
t.Fatal("Error:", err)
}
// Set user agent.
c.SetAppInfo("Minio-go-FunctionalTest", "0.1.0")
// Enable tracing, write to stdout.
// c.TraceOn(nil)
// Generate a new random bucket name.
bucketName := randString(60, rand.NewSource(time.Now().UnixNano()))
// make a new bucket.
err = c.MakeBucket(bucketName, "private", "us-east-1")
if err != nil {
t.Fatal("Error:", err, bucketName)
}
// generate a random file name.
fileName := randString(60, rand.NewSource(time.Now().UnixNano()))
file, err := os.Create(fileName)
if err != nil {
t.Fatal("Error:", err)
}
for i := 0; i < 10; i++ {
file.WriteString(fileName)
var totalSize int64
for i := 0; i < 3; i++ {
buf := make([]byte, rand.Intn(1<<19))
n, err := file.Write(buf)
if err != nil {
t.Fatal("Error:", err)
}
totalSize += int64(n)
}
file.Close()
// verify if bucket exits and you have access.
err = c.BucketExists(bucketName)
if err != nil {
t.Fatal("Error:", err, bucketName)
}
// make the bucket 'public read/write'.
err = c.SetBucketACL(bucketName, "public-read-write")
if err != nil {
t.Fatal("Error:", err)
}
// get the previously set acl.
acl, err := c.GetBucketACL(bucketName)
if err != nil {
t.Fatal("Error:", err)
}
// acl must be 'public read/write'.
if acl != minio.BucketACL("public-read-write") {
t.Fatal("Error:", acl)
}
_, err = c.ListBuckets()
// list all buckets.
buckets, err := c.ListBuckets()
if err != nil {
t.Fatal("Error:", err)
}
objectName := bucketName + "Minio"
reader := bytes.NewReader([]byte("Hello World!"))
// Verify if previously created bucket is listed in list buckets.
bucketFound := false
for _, bucket := range buckets {
if bucket.Name == bucketName {
bucketFound = true
}
}
// If bucket not found error out.
if !bucketFound {
t.Fatal("Error: bucket ", bucketName, "not found")
}
objectName := bucketName + "unique"
// generate data
buf := make([]byte, rand.Intn(1<<19))
reader := bytes.NewReader(buf)
n, err := c.PutObject(bucketName, objectName, reader, int64(reader.Len()), "")
if err != nil {
t.Fatal("Error: ", err)
}
if n != int64(len([]byte("Hello World!"))) {
if n != int64(len(buf)) {
t.Fatal("Error: bad length ", n, reader.Len())
}
@ -104,26 +275,75 @@ func TestFunctional(t *testing.T) {
t.Fatal("Error: ", err)
}
n, err = c.FPutObject(bucketName, objectName+"-f", fileName, "text/plain")
if err != nil {
t.Fatal("Error: ", err)
}
if n != int64(10*len(fileName)) {
t.Fatal("Error: bad length ", n, int64(10*len(fileName)))
}
err = c.FGetObject(bucketName, objectName+"-f", fileName+"-f")
if err != nil {
t.Fatal("Error: ", err)
}
newReadBytes, err := ioutil.ReadAll(newReader)
if err != nil {
t.Fatal("Error: ", err)
}
if !bytes.Equal(newReadBytes, []byte("Hello World!")) {
t.Fatal("Error: bytes invalid.")
if !bytes.Equal(newReadBytes, buf) {
t.Fatal("Error: bytes mismatch.")
}
n, err = c.FPutObject(bucketName, objectName+"-f", fileName, "text/plain")
if err != nil {
t.Fatal("Error: ", err)
}
if n != totalSize {
t.Fatal("Error: bad length ", n, totalSize)
}
err = c.FGetObject(bucketName, objectName+"-f", fileName+"-f")
if err != nil {
t.Fatal("Error: ", err)
}
presignedGetURL, err := c.PresignedGetObject(bucketName, objectName, 3600*time.Second)
if err != nil {
t.Fatal("Error: ", err)
}
resp, err := http.Get(presignedGetURL)
if err != nil {
t.Fatal("Error: ", err)
}
if resp.StatusCode != http.StatusOK {
t.Fatal("Error: ", resp.Status)
}
newPresignedBytes, err := ioutil.ReadAll(resp.Body)
if err != nil {
t.Fatal("Error: ", err)
}
if !bytes.Equal(newPresignedBytes, buf) {
t.Fatal("Error: bytes mismatch.")
}
presignedPutURL, err := c.PresignedPutObject(bucketName, objectName+"-presigned", 3600*time.Second)
if err != nil {
t.Fatal("Error: ", err)
}
buf = make([]byte, rand.Intn(1<<20))
req, err := http.NewRequest("PUT", presignedPutURL, bytes.NewReader(buf))
if err != nil {
t.Fatal("Error: ", err)
}
httpClient := &http.Client{}
resp, err = httpClient.Do(req)
if err != nil {
t.Fatal("Error: ", err)
}
newReader, _, err = c.GetObject(bucketName, objectName+"-presigned")
if err != nil {
t.Fatal("Error: ", err)
}
newReadBytes, err = ioutil.ReadAll(newReader)
if err != nil {
t.Fatal("Error: ", err)
}
if !bytes.Equal(newReadBytes, buf) {
t.Fatal("Error: bytes mismatch.")
}
err = c.RemoveObject(bucketName, objectName)
@ -134,6 +354,10 @@ func TestFunctional(t *testing.T) {
if err != nil {
t.Fatal("Error: ", err)
}
err = c.RemoveObject(bucketName, objectName+"-presigned")
if err != nil {
t.Fatal("Error: ", err)
}
err = c.RemoveBucket(bucketName)
if err != nil {

View File

@ -18,7 +18,6 @@ install:
- go env
- go get -u github.com/golang/lint/golint
- go get -u golang.org/x/tools/cmd/vet
- go get -u github.com/fzipp/gocyclo
- go get -u github.com/remyoudompheng/go-misc/deadcode
# to run your custom scripts instead of automatic MSBuild
@ -26,7 +25,6 @@ build_script:
- go vet ./...
- gofmt -s -l .
- golint github.com/minio/minio-go...
- gocyclo -over 30 .
- deadcode
- go test
- go test -race

View File

@ -82,7 +82,7 @@ func (c Client) getBucketLocation(bucketName string) (string, error) {
}
// Initiate the request.
resp, err := c.httpClient.Do(req)
resp, err := c.do(req)
defer closeResponse(resp)
if err != nil {
return "", err