Checkpoint for implementation of multipart upload

Check issue #142 for details

Code is operational, but not quite ready for
prime time -- needs some clean up


git-svn-id: http://s3fs.googlecode.com/svn/trunk@297 df820570-a93a-0410-bd06-b72b767a4274
This commit is contained in:
mooredan@suncup.net 2010-12-28 04:15:23 +00:00
parent 784d51d805
commit acc7363433
4 changed files with 718 additions and 37 deletions

View File

@ -1,7 +1,7 @@
dnl Process this file with autoconf to produce a configure script.
AC_PREREQ(2.59)
AC_INIT(s3fs, 1.30)
AC_INIT(s3fs, 1.31)
AC_CANONICAL_SYSTEM

View File

@ -71,6 +71,15 @@ struct BodyStruct {
size_t size;
};
// Memory structure for POST
struct WriteThis {
const char *readptr;
int sizeleft;
};
typedef struct curlhll {
CURL *handle;
struct curlhll *next;
@ -148,6 +157,10 @@ static int my_curl_progress(
// timeout?
if (now - curl_times[curl] > readwrite_timeout) {
pthread_mutex_unlock( &curl_handles_lock );
syslog(LOG_ERR, "timeout now: %i curl_times[curl]: %i readwrite_timeout: %i",
now, curl_times[curl], readwrite_timeout);
return CURLE_ABORTED_BY_CALLBACK;
}
}
@ -560,6 +573,7 @@ static void locate_bundle(void) {
static int my_curl_easy_perform(CURL* curl, BodyStruct* body = NULL, FILE* f = 0) {
// char* url = new char[128];
char url[256];
time_t now;
char* ptr_url = url;
// char* url = (char *)malloc(256);
// if (url) {
@ -606,7 +620,7 @@ static int my_curl_easy_perform(CURL* curl, BodyStruct* body = NULL, FILE* f = 0
if (responseCode >= 500) {
syslog(LOG_ERR, "###HTTP response=%ld", responseCode);
sleep(10);
sleep(4);
break;
}
@ -652,22 +666,24 @@ static int my_curl_easy_perform(CURL* curl, BodyStruct* body = NULL, FILE* f = 0
case CURLE_COULDNT_CONNECT:
syslog(LOG_ERR, "### CURLE_COULDNT_CONNECT");
sleep(10);
sleep(4);
break;
case CURLE_GOT_NOTHING:
syslog(LOG_ERR, "### CURLE_GOT_NOTHING");
sleep(10);
sleep(4);
break;
case CURLE_ABORTED_BY_CALLBACK:
syslog(LOG_ERR, "### CURLE_ABORTED_BY_CALLBACK");
sleep(10);
sleep(4);
now = time(0);
curl_times[curl] = now;
break;
case CURLE_PARTIAL_FILE:
syslog(LOG_ERR, "### CURLE_PARTIAL_FILE");
sleep(10);
sleep(4);
break;
case CURLE_SSL_CACERT:
@ -778,6 +794,14 @@ string get_date() {
return buf;
}
string get_day_after_tomorrow() {
char buf[100];
time_t t = time(NULL);
t = t + (2 * 24 * 60 * 60);
strftime(buf, sizeof(buf), "%a, %d %b %Y %H:%M:%S GMT", gmtime(&t));
return buf;
}
/**
* Returns the Amazon AWS signature for the given parameters.
*
@ -914,6 +938,34 @@ static size_t WriteMemoryCallback(void *ptr, size_t blockSize, size_t numBlocks,
return realsize;
}
// read_callback
// http://curl.haxx.se/libcurl/c/post-callback.html
static size_t read_callback(void *ptr, size_t size, size_t nmemb, void *userp)
{
struct WriteThis *pooh = (struct WriteThis *)userp;
if(size*nmemb < 1)
return 0;
if(pooh->sizeleft) {
*(char *)ptr = pooh->readptr[0]; /* copy one single byte */
pooh->readptr++; /* advance pointer */
pooh->sizeleft--; /* less data left */
return 1; /* we return 1 byte at a time! */
}
return 0; /* no more data left to deliver */
}
static size_t header_callback(void *data, size_t blockSize, size_t numBlocks, void *userPtr) {
headers_t* headers = reinterpret_cast<headers_t*>(userPtr);
string header(reinterpret_cast<char*>(data), blockSize * numBlocks);
@ -1022,6 +1074,10 @@ int get_headers(const char* path, headers_t& meta) {
* get_local_fd
*/
int get_local_fd(const char* path) {
if(foreground)
cout << " get_local_fd[path=" << path << "]" << endl;
CURL *curl = NULL;
int result;
string resource(urlEncode(service_path + bucket + path));
@ -1122,7 +1178,7 @@ int get_local_fd(const char* path) {
curl_easy_setopt(curl, CURLOPT_HTTPHEADER, headers.get());
if(foreground)
cout << "downloading[path=" << path << "][fd=" << fd << "]" << endl;
cout << " downloading[path=" << path << "][fd=" << fd << "]" << endl;
string my_url = prepare_url(url.c_str());
curl_easy_setopt(curl, CURLOPT_URL, my_url.c_str());
@ -1151,6 +1207,7 @@ int get_local_fd(const char* path) {
* @return fuse return code
*/
static int put_headers(const char* path, headers_t meta) {
CURL *curl = NULL;
string resource = urlEncode(service_path + bucket + path);
@ -1158,12 +1215,18 @@ static int put_headers(const char* path, headers_t meta) {
struct BodyStruct body;
int result;
if(foreground)
cout << " put_headers[path=" << path << "]" << endl;
body.text = (char *)malloc(1); /* will be grown as needed by the realloc above */
body.size = 0; /* no data at this point */
curl = create_curl_handle();
// curl_easy_setopt(curl, CURLOPT_FAILONERROR, true);
curl_easy_setopt(curl, CURLOPT_FOLLOWLOCATION, true);
curl_easy_setopt(curl, CURLOPT_VERBOSE, true);
curl_easy_setopt(curl, CURLOPT_WRITEDATA, (void *)&body);
curl_easy_setopt(curl, CURLOPT_WRITEFUNCTION, WriteMemoryCallback);
@ -1207,13 +1270,17 @@ static int put_headers(const char* path, headers_t meta) {
if(debug) syslog(LOG_DEBUG, "copy path=%s", path);
if(foreground)
cout << "copying[path=" << path << "]" << endl;
cout << " copying[path=" << path << "]" << endl;
string my_url = prepare_url(url.c_str());
curl_easy_setopt(curl, CURLOPT_URL, my_url.c_str());
result = my_curl_easy_perform(curl, &body);
if(body.size > 0) {
printf("body.text: %s\n", body.text);
}
if(body.text) {
free(body.text);
}
@ -1227,29 +1294,26 @@ static int put_headers(const char* path, headers_t meta) {
return 0;
}
/**
* create or update s3 object
* @return fuse return code
*/
static int put_local_fd(const char* path, headers_t meta, int fd) {
CURL *curl = NULL;
string resource = urlEncode(service_path + bucket + path);
string url = host + resource;
struct BodyStruct body;
int result;
body.text = (char *)malloc(1); /* will be grown as needed by the realloc above */
body.size = 0; /* no data at this point */
static int put_local_fd_small_file(const char* path, headers_t meta, int fd) {
struct stat st;
if (fstat(fd, &st) == -1) {
YIKES(-errno);
}
CURL *curl = NULL;
string resource = urlEncode(service_path + bucket + path);
string url = host + resource;
struct BodyStruct body;
int result;
body.text = (char *)malloc(1);
body.size = 0;
curl = create_curl_handle();
// curl_easy_setopt(curl, CURLOPT_FAILONERROR, true);
curl_easy_setopt(curl, CURLOPT_FOLLOWLOCATION, true);
// curl_easy_setopt(curl, CURLOPT_VERBOSE, true);
curl_easy_setopt(curl, CURLOPT_WRITEDATA, (void *)&body);
curl_easy_setopt(curl, CURLOPT_WRITEFUNCTION, WriteMemoryCallback);
@ -1296,13 +1360,15 @@ static int put_local_fd(const char* path, headers_t meta, int fd) {
if(debug) syslog(LOG_DEBUG, "upload path=%s size=%llu", path, st.st_size);
if(foreground)
cout << "uploading[path=" << path << "][fd=" << fd << "][size="<<st.st_size <<"]" << endl;
cout << " uploading[path=" << path << "][fd=" << fd << "][size="<<st.st_size <<"]" << endl;
string my_url = prepare_url(url.c_str());
curl_easy_setopt(curl, CURLOPT_URL, my_url.c_str());
result = my_curl_easy_perform(curl, &body, f);
//////////////////////////////////
if(body.text) {
free(body.text);
}
@ -1316,6 +1382,603 @@ static int put_local_fd(const char* path, headers_t meta, int fd) {
return 0;
}
static int put_local_fd_big_file(const char* path, headers_t meta, int fd) {
CURL *curl = NULL;
struct stat st;
string resource;
string url;
struct BodyStruct body;
struct BodyStruct header;
string ETag;
int result;
unsigned long lSize;
FILE* pSourceFile;
FILE* pPartFile;
char * buffer;
unsigned long lBufferSize = 0;
size_t bytesRead;
size_t bytesWritten;
char * partFileName;
int partNumber;
string date;
string expires;
string raw_date;
string auth;
string my_url;
const char * UploadId;
string uploadId;
struct curl_slist *slist=NULL;
vector <string> etags;
int i;
if(foreground)
cout << " put_local_fd_big_file[path=" << path << "][fd=" << fd << "]" << endl;
// Initialization of variables
body.text = (char *)malloc(1);
body.size = 0;
curl = NULL;
etags.clear();
if (fstat(fd, &st) == -1) {
YIKES(-errno);
}
///////////////////////////////
// Initiate Multipart Upload
///////////////////////////////
// Example :
//
// POST /example-object?uploads HTTP/1.1
// Host: example-bucket.s3.amazonaws.com
// Date: Mon, 1 Nov 2010 20:34:56 GMT
// Authorization: AWS VGhpcyBtZXNzYWdlIHNpZ25lZCBieSBlbHZpbmc=
if(foreground)
cout << " initiate multipart upload [path=" << path << "][size=" << st.st_size << "]" << endl;
resource = urlEncode(service_path + bucket + path);
resource.append("?uploads");
url = host + resource;
// url.append("?uploads");
my_url = prepare_url(url.c_str());
curl = create_curl_handle();
curl_easy_setopt(curl, CURLOPT_FOLLOWLOCATION, true);
// curl_easy_setopt(curl, CURLOPT_VERBOSE, true);
curl_easy_setopt(curl, CURLOPT_WRITEDATA, (void *)&body);
curl_easy_setopt(curl, CURLOPT_WRITEFUNCTION, WriteMemoryCallback);
curl_easy_setopt(curl, CURLOPT_POST, true);
curl_easy_setopt(curl, CURLOPT_POSTFIELDSIZE, 0);
date.assign("Date: ");
raw_date = get_date();
date.append(raw_date);
slist = curl_slist_append(slist, date.c_str());
// expires.assign("Expires: ");
// expires.append(get_day_after_tomorrow());
// slist = curl_slist_append(slist, expires.c_str());
slist = curl_slist_append(slist, "Accept:");
slist = curl_slist_append(slist, "Content-Length:");
string ContentType;
ContentType.assign("Content-Type: ");
string ctype_data;
ctype_data.assign(lookupMimeType(path));
ContentType.append(ctype_data);
// slist = curl_slist_append(slist, "Content-Type:");
slist = curl_slist_append(slist, ContentType.c_str());
// x-amz headers: (a) alphabetical order and (b) no spaces after colon
// meta["x-amz-acl"] = default_acl;
string acl;
acl.assign("x-amz-acl:");
acl.append(default_acl);
slist = curl_slist_append(slist, acl.c_str());
// printf("ContentType: %s\n", (meta["Content-Type"]).c_str());
// printf("default_acl: %s\n", default_acl.c_str());
for (headers_t::iterator iter = meta.begin(); iter != meta.end(); ++iter) {
string key = (*iter).first;
string value = (*iter).second;
//
// printf("key: %s value: %s\n", key.c_str(), value.c_str());
if (key.substr(0,10) == "x-amz-meta") {
string entry;
entry.assign(key);
entry.append(":");
entry.append(value);
slist = curl_slist_append(slist, entry.c_str());
}
}
if (use_rrs.substr(0,1) == "1") {
slist = curl_slist_append(slist, "x-amz-storage-class:REDUCED_REDUNDANCY");
}
if (public_bucket.substr(0,1) != "1") {
auth.assign("Authorization: AWS ");
auth.append(AWSAccessKeyId);
auth.append(":");
// auth.append(calc_signature("POST", "", raw_date, slist, resource));
auth.append(calc_signature("POST", ctype_data, raw_date, slist, resource));
slist = curl_slist_append(slist, auth.c_str());
}
curl_easy_setopt(curl, CURLOPT_HTTPHEADER, slist);
curl_easy_setopt(curl, CURLOPT_URL, my_url.c_str());
result = my_curl_easy_perform(curl, &body);
// printf("body.text:\n%s\n", body.text);
curl_slist_free_all(slist);
destroy_curl_handle(curl);
if(result != 0) {
printf("body.text:\n%s\n", body.text);
if(body.text) free(body.text);
return result;
}
// XML returns UploadId
// Parse XML body for UploadId
uploadId.clear();
xmlDocPtr doc = xmlReadMemory(body.text, body.size, "", NULL, 0);
if (doc != NULL && doc->children != NULL) {
for (xmlNodePtr cur_node = doc->children->children;
cur_node != NULL;
cur_node = cur_node->next) {
// string cur_node_name(reinterpret_cast<const char *>(cur_node->name));
// printf("cur_node_name: %s\n", cur_node_name.c_str());
if (cur_node->type == XML_ELEMENT_NODE) {
string elementName = reinterpret_cast<const char*>(cur_node->name);
// printf("elementName: %s\n", elementName.c_str());
if (cur_node->children != NULL) {
if (cur_node->children->type == XML_TEXT_NODE) {
if (elementName == "UploadId") {
uploadId = reinterpret_cast<const char *>(cur_node->children->content);
}
}
}
}
} // for (xmlNodePtr cur_node = doc->children->children;
} // if (doc != NULL && doc->children != NULL) {
xmlFreeDoc(doc);
// printf("uploadId: %s\n", uploadId.c_str());
// clean up
if(body.text) free(body.text);
body.size = 0;
curl = NULL;
if(uploadId.size() == 0) {
syslog(LOG_ERR, "Could not determine UploadId");
return(-EIO);
}
///////////////////////////////
// Upload Parts
///////////////////////////////
// PUT /my-movie.m2ts?partNumber=1&uploadId=VCVsb2FkIElEIGZvciBlbZZpbmcncyBteS1tb3ZpZS5tMnRzIHVwbG9hZR
// HTTP/1.1
// Host: example-bucket.s3.amazonaws.com
// Date: Mon, 1 Nov 2010 20:34:56 GMT
// Content-Length: 10485760
// Content-MD5: pUNXr/BjKK5G2UKvaRRrOA==
// Authorization: AWS VGhpcyBtZXNzYWdlIHNpZ25lZGGieSRlbHZpbmc=
//
// ***part data omitted***
// CURLOPT_UPLOAD to be used
// cycle through open fd, pulling off 10MB chunks at a time
// Open the source file
pSourceFile = fdopen(fd, "rb");
if (pSourceFile == NULL) {
syslog(LOG_ERR, "%d###result=%d", __LINE__, errno); \
return(-errno);
}
// Source sucessfully opened
// obtain file size:
// error check this
fseek (pSourceFile , 0 , SEEK_END);
lSize = ftell (pSourceFile);
rewind (pSourceFile);
// Does is match stat's info?
if (lSize != st.st_size) {
syslog(LOG_ERR, "%d ### Sizes do not match lSize: %i st_size: %u\n", __LINE__, lSize, st.st_size);
// if(pSourceFile != NULL) fclose(pSourceFile);
// return(-EIO);
}
// printf("lSize: %i\n", lSize);
// printf("st_size: %i\n", st.st_size);
lSize = (unsigned long)st.st_size;
lBufferSize = 0;
partNumber = 0;
while(lSize > 0) {
partNumber++;
if(lSize >= 10485760) {
lBufferSize = 10485760;
} else {
lBufferSize = lSize;
}
lSize = lSize - lBufferSize;
buffer = (char*) malloc (sizeof(char)*lBufferSize);
if (buffer == NULL) {
syslog(LOG_CRIT, "Could not allocation memory for buffer\n");
exit(1);
}
// copy the file portion into the buffer:
bytesRead = fread (buffer, 1, lBufferSize, pSourceFile);
if (bytesRead != lBufferSize) {
syslog(LOG_ERR, "%d ### bytesRead:%i does not match lBufferSize: %i\n",
__LINE__, bytesRead, lBufferSize);
if(buffer) free(buffer);
return(-EIO);
}
// printf("bytesRead: %i\n", bytesRead);
// source file portion is now in memory
// open a temporary file for upload
partFileName = tmpnam (NULL);
if(partFileName == NULL) {
syslog(LOG_ERR, "%d ### Could not create temporary file\n", __LINE__);
if(buffer) free(buffer);
return(-errno);
}
// printf ("Tempfile created: %s\n", partFileName);
pPartFile = fopen(partFileName, "wb");
if(pPartFile == NULL) {
syslog(LOG_ERR, "%d ### Could not open temporary file: errno %i\n",
__LINE__, errno);
if(buffer) free(buffer);
return(-errno);
}
// printf ("Tempfile opened: %s\n", partFileName);
// Copy buffer to tmpfile
bytesWritten = fwrite (buffer, 1, (size_t)lBufferSize, pPartFile);
if (bytesWritten != lBufferSize) {
syslog(LOG_ERR, "%d ### bytesWritten:%i does not match lBufferSize: %i\n",
__LINE__, bytesWritten, lBufferSize);
fclose(pPartFile);
if(buffer) free(buffer);
return(-EIO);
}
// printf ("Tempfile written: %s\n", partFileName);
fclose(pPartFile);
if(buffer) free(buffer);
// Now upload the file as the nth part, need upload ID!
// printf("now upload part %i filename: %s size: %i\n", partNumber, partFileName, bytesWritten);
if(foreground)
cout << " multipart upload [path=" << path << "][part=" << partNumber << "][bytes=" << bytesWritten << "]" << endl;
// PUT /ObjectName?partNumber=PartNumber&uploadId=UploadId HTTP/1.1
// Host: BucketName.s3.amazonaws.com
// Date: date
// Content-Length: Size
// Authorization: Signature
// PUT /my-movie.m2ts?partNumber=1&uploadId=VCVsb2FkIElEIGZvciBlbZZpbmcncyBteS1tb3ZpZS5tMnRzIHVwbG9hZR HTTP/1.1
// Host: example-bucket.s3.amazonaws.com
// Date: Mon, 1 Nov 2010 20:34:56 GMT
// Content-Length: 10485760
// Content-MD5: pUNXr/BjKK5G2UKvaRRrOA==
// Authorization: AWS VGhpcyBtZXNzYWdlIHNpZ25lZGGieSRlbHZpbmc=
pPartFile = fopen(partFileName, "rb");
if (pPartFile == NULL) {
YIKES(-errno);
}
resource = urlEncode(service_path + bucket + path);
resource.append("?partNumber=");
resource.append(IntToStr(partNumber));
resource.append("&uploadId=");
resource.append(uploadId);
url = host + resource;
my_url = prepare_url(url.c_str());
// printf("my_url: %s\n", my_url.c_str());
body.text = (char *)malloc(1);
body.size = 0;
header.text = (char *)malloc(1);
header.size = 0;
curl = create_curl_handle();
// printf("got curl handle\n");
curl_easy_setopt(curl, CURLOPT_FOLLOWLOCATION, true);
// curl_easy_setopt(curl, CURLOPT_VERBOSE, true);
curl_easy_setopt(curl, CURLOPT_WRITEDATA, (void *)&body);
curl_easy_setopt(curl, CURLOPT_WRITEFUNCTION, WriteMemoryCallback);
curl_easy_setopt(curl, CURLOPT_HEADERDATA, (void *)&header);
curl_easy_setopt(curl, CURLOPT_HEADERFUNCTION, WriteMemoryCallback);
curl_easy_setopt(curl, CURLOPT_UPLOAD, true); // HTTP PUT
curl_easy_setopt(curl, CURLOPT_INFILESIZE_LARGE, static_cast<curl_off_t>(bytesWritten)); // Content-Length
curl_easy_setopt(curl, CURLOPT_INFILE, pPartFile);
date.assign("Date: ");
raw_date = get_date();
date.append(raw_date);
slist = NULL;
slist = curl_slist_append(slist, date.c_str());
slist = curl_slist_append(slist, "Accept:");
// printf("now got slist\n");
if (public_bucket.substr(0,1) != "1") {
auth.assign("Authorization: AWS ");
auth.append(AWSAccessKeyId);
auth.append(":");
auth.append(calc_signature("PUT", "", raw_date, slist, resource));
slist = curl_slist_append(slist, auth.c_str());
}
curl_easy_setopt(curl, CURLOPT_HTTPHEADER, slist);
curl_easy_setopt(curl, CURLOPT_URL, my_url.c_str());
result = my_curl_easy_perform(curl, &body, pPartFile);
curl_slist_free_all(slist);
destroy_curl_handle(curl);
fclose(pPartFile);
if( result != 0 && body.size !=0 ) {
printf("body.text:\n%s\n", body.text);
}
if(result != 0) {
if(body.text) free(body.text);
return result;
}
// printf("BEGIN header.text:\n%s\nEND header.text\n", header.text);
// Error check this section
char *ptr1;
char *ptr2;
ptr1 = strstr(header.text, "ETag:");
ptr2 = strstr(header.text, "Content-Length:");
ptr1 = ptr1 + 6;
ETag.assign(ptr1, (size_t)(ptr2 - ptr1 - 2));
// printf("ETag: %s\n", ETag.c_str());
// Got the ETag, now store
etags.push_back(ETag);
// Clean up
if(body.text) free(body.text);
if(header.text) free(header.text);
result = remove(partFileName);
if (result != 0) {
syslog(LOG_ERR, "Could not remove temporary file: %s errno: %i\n",
partFileName, errno);
return(-errno);
}
} // while(lSize > 0) {
///////////////////////////////
// Complete Multipart Upload
///////////////////////////////
if(foreground)
cout << " complete multipart upload [path=" << path << "]" << endl;
string postContent;
postContent.clear();
postContent.append("<CompleteMultipartUpload>\n");
for (i = 0; i < partNumber; i++) {
postContent.append(" <Part>\n");
postContent.append(" <PartNumber>");
postContent.append(IntToStr(i+1));
postContent.append("</PartNumber>\n");
postContent.append(" <ETag>");
postContent.append(etags[i]);
postContent.append("</ETag>\n");
postContent.append(" </Part>\n");
}
postContent.append("</CompleteMultipartUpload>\n");
// printf("%s", postContent.c_str());
// printf("size: %i\n", postContent.size());
char *pData;
struct WriteThis pooh;
// error check pData
pData = (char *)malloc( postContent.size() + 1);
pooh.readptr = pData;
// pooh.sizeleft = strlen(pData);
pooh.sizeleft = postContent.size();
strcpy(pData, postContent.c_str());
postContent.clear();
// printf("%s", pData);
resource = urlEncode(service_path + bucket + path);
resource.append("?uploadId=");
resource.append(uploadId);
url = host + resource;
my_url = prepare_url(url.c_str());
body.text = (char *)malloc(1);
body.size = 0;
curl = create_curl_handle();
curl_easy_setopt(curl, CURLOPT_FOLLOWLOCATION, true);
// curl_easy_setopt(curl, CURLOPT_VERBOSE, true);
curl_easy_setopt(curl, CURLOPT_WRITEDATA, (void *)&body);
curl_easy_setopt(curl, CURLOPT_WRITEFUNCTION, WriteMemoryCallback);
curl_easy_setopt(curl, CURLOPT_POST, true);
// curl_easy_setopt(curl, CURLOPT_POSTFIELDSIZE, 0);
curl_easy_setopt(curl, CURLOPT_POSTFIELDSIZE, (curl_off_t)pooh.sizeleft);
curl_easy_setopt(curl, CURLOPT_READFUNCTION, read_callback);
curl_easy_setopt(curl, CURLOPT_READDATA, &pooh);
date.assign("Date: ");
raw_date = get_date();
date.append(raw_date);
slist = NULL;
slist = curl_slist_append(slist, date.c_str());
slist = curl_slist_append(slist, "Accept:");
slist = curl_slist_append(slist, "Content-Type:");
// slist = curl_slist_append(slist, "Content-Length:");
// if (use_rrs.substr(0,1) == "1") {
// slist = curl_slist_append(slist, "x-amz-storage-class:REDUCED_REDUNDANCY");
// }
if (public_bucket.substr(0,1) != "1") {
auth.assign("Authorization: AWS ");
auth.append(AWSAccessKeyId);
auth.append(":");
auth.append(calc_signature("POST", "", raw_date, slist, resource));
slist = curl_slist_append(slist, auth.c_str());
}
curl_easy_setopt(curl, CURLOPT_HTTPHEADER, slist);
curl_easy_setopt(curl, CURLOPT_URL, my_url.c_str());
result = my_curl_easy_perform(curl, &body);
// printf("body.text:\n%s\n", body.text);
curl_slist_free_all(slist);
destroy_curl_handle(curl);
free(pData);
if(result != 0) {
if(body.text) free(body.text);
return result;
}
/*
result=put_headers(path, meta);
if(result != 0) {
return result;
}
*/
// Abort Multipart Upload ??
// if(body.text) {
// free(body.text);
// }
return 0;
}
/**
* create or update s3 object
* @return fuse return code
*/
static int put_local_fd(const char* path, headers_t meta, int fd) {
if(foreground)
cout << " put_local_fd[path=" << path << "][fd=" << fd << "]" << endl;
int result;
struct stat st;
if (fstat(fd, &st) == -1) {
YIKES(-errno);
}
/////////////////////////////////////////////////////////
// Make decision to do multi upload (or not)
// based upon file size
//
// According to the AWS spec:
// - 1 to 10,000 parts are allowed
// - minimum size of parts is 5MB (expect for the last part)
//
// For our application, we will define part size to be 10MB (10 * 2^20 Bytes)
// maximum file size will be ~2 GB - 2 ** 31
//
// Initially uploads will be done serially
//
// If file is > 20MB, then multipart will kick in
/////////////////////////////////////////////////////////////
if(st.st_size > 2147000000) { // ~2GB
// close f ?
return -ENOTSUP;
}
if(st.st_size > 20971520) { // 20MB
result = put_local_fd_big_file(path, meta, fd);
} else {
result = put_local_fd_small_file(path, meta, fd);
}
return result;
}
static int s3fs_getattr(const char *path, struct stat *stbuf) {
@ -1324,7 +1987,7 @@ static int s3fs_getattr(const char *path, struct stat *stbuf) {
CURL *curl;
if(foreground)
cout << "getattr[path=" << path << "]" << endl;
cout << "s3fs_getattr[path=" << path << "]" << endl;
memset(stbuf, 0, sizeof(struct stat));
if (strcmp(path, "/") == 0) {
@ -1537,7 +2200,7 @@ static int create_file_object(const char *path, mode_t mode) {
int result;
if(foreground)
cout << "create_file_object[path=" << path << "][mode=" << mode << "]" << endl;
cout << " create_file_object[path=" << path << "][mode=" << mode << "]" << endl;
string resource = urlEncode(service_path + bucket + path);
string url = host + resource;
@ -1586,7 +2249,7 @@ static int create_file_object(const char *path, mode_t mode) {
static int s3fs_mknod(const char *path, mode_t mode, dev_t rdev) {
int result;
if(foreground)
cout << "mknod[path=" << path << "][mode=" << mode << "]" << endl;
cout << "s3fs_mknod[path=" << path << "][mode=" << mode << "]" << endl;
// see man 2 mknod
// If pathname already exists, or is a symbolic link, this call fails with an EEXIST error.
@ -1609,7 +2272,7 @@ static int s3fs_create(const char *path, mode_t mode, struct fuse_file_info *fi)
headers_t meta;
if(foreground)
cout << "create[path=" << path << "][mode=" << mode << "]" << "[flags=" << fi->flags << "]" << endl;
cout << "s3fs_create[path=" << path << "][mode=" << mode << "]" << "[flags=" << fi->flags << "]" << endl;
result = create_file_object(path, mode);
@ -1658,6 +2321,9 @@ static int s3fs_mkdir(const char *path, mode_t mode) {
headers.append("x-amz-meta-mode:" + str(mode));
headers.append("x-amz-meta-mtime:" + str(time(NULL)));
headers.append("x-amz-meta-uid:" + str(getuid()));
if (use_rrs.substr(0,1) == "1") {
headers.append("x-amz-storage-class:REDUCED_REDUNDANCY");
}
if (public_bucket.substr(0,1) != "1") {
headers.append("Authorization: AWS " + AWSAccessKeyId + ":" +
calc_signature("PUT", "application/x-directory", date, headers.get(), resource));
@ -2369,7 +3035,7 @@ static int s3fs_read(
const char *path, char *buf, size_t size, off_t offset, struct fuse_file_info *fi) {
int res = pread(fi->fh, buf, size, offset);
if(foreground)
cout << "read[path=" << path << "]" << endl;
cout << "s3fs_read[path=" << path << "]" << endl;
if (res == -1) {
YIKES(-errno);
}
@ -2381,7 +3047,7 @@ static int s3fs_write(
int res = pwrite(fi->fh, buf, size, offset);
if(foreground)
cout << "write[path=" << path << "]" << endl;
cout << "s3fs_write[path=" << path << "]" << endl;
if (res == -1) {
YIKES(-errno);
}
@ -2410,7 +3076,7 @@ static int s3fs_flush(const char *path, struct fuse_file_info *fi) {
int fd = fi->fh;
if(foreground)
cout << "flush[path=" << path << "][fd=" << fd << "]" << endl;
cout << "s3fs_flush[path=" << path << "][fd=" << fd << "]" << endl;
// NOTE- fi->flags is not available here
int flags = get_flags(fd);
@ -2430,7 +3096,7 @@ static int s3fs_release(const char *path, struct fuse_file_info *fi) {
int fd = fi->fh;
if(foreground)
cout << "release[path=" << path << "][fd=" << fd << "]" << endl;
cout << "s3fs_release[path=" << path << "][fd=" << fd << "]" << endl;
if (close(fd) == -1) {
YIKES(-errno);
@ -2851,18 +3517,28 @@ static int s3fs_access(const char *path, int mask) {
// aka touch
static int s3fs_utimens(const char *path, const struct timespec ts[2]) {
int result;
if(foreground)
cout << "utimens[path=" << path << "][mtime=" << str(ts[1].tv_sec) << "]" << endl;
headers_t meta;
if(foreground)
cout << "s3fs_utimens[path=" << path << "][mtime=" << str(ts[1].tv_sec) << "]" << endl;
if(foreground)
cout << " calling get_headers [path=" << path << "]" << endl;
result = get_headers(path, meta);
if(result != 0) {
return result;
}
meta["x-amz-meta-mtime"] = str(ts[1].tv_sec);
meta["x-amz-copy-source"] = urlEncode("/" + bucket + path);
meta["x-amz-metadata-directive"] = "REPLACE";
return put_headers(path, meta);
if(foreground)
cout << " calling put_headers [path=" << path << "]" << endl;
result = put_headers(path, meta);
return result;
}
///////////////////////////////////////////////////////////

View File

@ -81,6 +81,7 @@ static pthread_mutex_t *mutex_buf = NULL;
static struct fuse_operations s3fs_oper;
string urlEncode(const string &s);
string lookupMimeType(string);
static int s3fs_getattr(const char *path, struct stat *stbuf);
static int s3fs_readlink(const char *path, char *buf, size_t size);
@ -109,4 +110,5 @@ static int s3fs_utimens(const char *path, const struct timespec ts[2]);
static void* s3fs_init(struct fuse_conn_info *conn);
static void s3fs_destroy(void*);
#endif // S3FS_S3_H_

View File

@ -29,14 +29,17 @@ fi
# Write a small test file
for x in `seq 1 $TEST_TEXT_FILE_LENGTH`
do
echo $TEST_TEXT >> $TEST_TEXT_FILE
echo "echo ${TEST_TEXT} to ${TEST_TEXT_FILE}"
echo $TEST_TEXT >> $TEST_TEXT_FILE
done
# Verify contents of file
echo "Verifying length of test file"
FILE_LENGTH=`wc -l $TEST_TEXT_FILE | awk '{print $1}'`
if [ "$FILE_LENGTH" -ne "$TEST_TEXT_FILE_LENGTH" ]
then
exit 1
echo "error: expected $TEST_TEXT_FILE_LENGTH , got $FILE_LENGTH"
exit 1
fi
# Delete the test file