Merge pull request #859 from gaul/upload-remove-batching

Upload S3 parts without batching
This commit is contained in:
Takeshi Nakatani 2018-11-18 22:27:20 +09:00 committed by GitHub
commit a92668ae78
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
3 changed files with 148 additions and 43 deletions

View File

@ -50,6 +50,7 @@
#include "s3fs_util.h" #include "s3fs_util.h"
#include "s3fs_auth.h" #include "s3fs_auth.h"
#include "addhead.h" #include "addhead.h"
#include "psemaphore.h"
using namespace std; using namespace std;
@ -1321,56 +1322,49 @@ int S3fsCurl::ParallelMultipartUploadRequest(const char* tpath, headers_t& meta,
} }
s3fscurl.DestroyCurlHandle(); s3fscurl.DestroyCurlHandle();
// Initialize S3fsMultiCurl
S3fsMultiCurl curlmulti;
curlmulti.SetSuccessCallback(S3fsCurl::UploadMultipartPostCallback);
curlmulti.SetRetryCallback(S3fsCurl::UploadMultipartPostRetryCallback);
// cycle through open fd, pulling off 10MB chunks at a time // cycle through open fd, pulling off 10MB chunks at a time
for(remaining_bytes = st.st_size; 0 < remaining_bytes; ){ for(remaining_bytes = st.st_size; 0 < remaining_bytes; ){
S3fsMultiCurl curlmulti; off_t chunk = remaining_bytes > S3fsCurl::multipart_size ? S3fsCurl::multipart_size : remaining_bytes;
int para_cnt;
off_t chunk;
// Initialize S3fsMultiCurl // s3fscurl sub object
curlmulti.SetSuccessCallback(S3fsCurl::UploadMultipartPostCallback); S3fsCurl* s3fscurl_para = new S3fsCurl(true);
curlmulti.SetRetryCallback(S3fsCurl::UploadMultipartPostRetryCallback); s3fscurl_para->partdata.fd = fd2;
s3fscurl_para->partdata.startpos = st.st_size - remaining_bytes;
s3fscurl_para->partdata.size = chunk;
s3fscurl_para->b_partdata_startpos = s3fscurl_para->partdata.startpos;
s3fscurl_para->b_partdata_size = s3fscurl_para->partdata.size;
s3fscurl_para->partdata.add_etag_list(&list);
// Loop for setup parallel upload(multipart) request. // initiate upload part for parallel
for(para_cnt = 0; para_cnt < S3fsCurl::max_parallel_cnt && 0 < remaining_bytes; para_cnt++, remaining_bytes -= chunk){ if(0 != (result = s3fscurl_para->UploadMultipartPostSetup(tpath, list.size(), upload_id))){
// chunk size S3FS_PRN_ERR("failed uploading part setup(%d)", result);
chunk = remaining_bytes > S3fsCurl::multipart_size ? S3fsCurl::multipart_size : remaining_bytes; close(fd2);
delete s3fscurl_para;
// s3fscurl sub object return result;
S3fsCurl* s3fscurl_para = new S3fsCurl(true);
s3fscurl_para->partdata.fd = fd2;
s3fscurl_para->partdata.startpos = st.st_size - remaining_bytes;
s3fscurl_para->partdata.size = chunk;
s3fscurl_para->b_partdata_startpos = s3fscurl_para->partdata.startpos;
s3fscurl_para->b_partdata_size = s3fscurl_para->partdata.size;
s3fscurl_para->partdata.add_etag_list(&list);
// initiate upload part for parallel
if(0 != (result = s3fscurl_para->UploadMultipartPostSetup(tpath, list.size(), upload_id))){
S3FS_PRN_ERR("failed uploading part setup(%d)", result);
close(fd2);
delete s3fscurl_para;
return result;
}
// set into parallel object
if(!curlmulti.SetS3fsCurlObject(s3fscurl_para)){
S3FS_PRN_ERR("Could not make curl object into multi curl(%s).", tpath);
close(fd2);
delete s3fscurl_para;
return -1;
}
} }
// Multi request // set into parallel object
if(0 != (result = curlmulti.Request())){ if(!curlmulti.SetS3fsCurlObject(s3fscurl_para)){
S3FS_PRN_ERR("error occurred in multi request(errno=%d).", result); S3FS_PRN_ERR("Could not make curl object into multi curl(%s).", tpath);
break; close(fd2);
delete s3fscurl_para;
return -1;
} }
// reinit for loop. remaining_bytes -= chunk;
curlmulti.Clear();
} }
// Multi request
if(0 != (result = curlmulti.Request())){
S3FS_PRN_ERR("error occurred in multi request(errno=%d).", result);
return result;
}
close(fd2); close(fd2);
if(0 != (result = s3fscurl.CompleteMultipartPostRequest(tpath, upload_id, list))){ if(0 != (result = s3fscurl.CompleteMultipartPostRequest(tpath, upload_id, list))){
@ -1666,7 +1660,8 @@ S3fsCurl::S3fsCurl(bool ahbe) :
hCurl(NULL), type(REQTYPE_UNSET), path(""), base_path(""), saved_path(""), url(""), requestHeaders(NULL), hCurl(NULL), type(REQTYPE_UNSET), path(""), base_path(""), saved_path(""), url(""), requestHeaders(NULL),
bodydata(NULL), headdata(NULL), LastResponseCode(-1), postdata(NULL), postdata_remaining(0), is_use_ahbe(ahbe), bodydata(NULL), headdata(NULL), LastResponseCode(-1), postdata(NULL), postdata_remaining(0), is_use_ahbe(ahbe),
retry_count(0), b_infile(NULL), b_postdata(NULL), b_postdata_remaining(0), b_partdata_startpos(0), b_partdata_size(0), retry_count(0), b_infile(NULL), b_postdata(NULL), b_postdata_remaining(0), b_partdata_startpos(0), b_partdata_size(0),
b_ssekey_pos(-1), b_ssevalue(""), b_ssetype(SSE_DISABLE), op(""), query_string("") b_ssekey_pos(-1), b_ssevalue(""), b_ssetype(SSE_DISABLE), op(""), query_string(""),
sem(NULL)
{ {
} }
@ -3908,11 +3903,39 @@ int S3fsMultiCurl::MultiPerform(void)
std::vector<pthread_t> threads; std::vector<pthread_t> threads;
bool success = true; bool success = true;
bool isMultiHead = false; bool isMultiHead = false;
Semaphore sem(S3fsCurl::max_parallel_cnt);
for(s3fscurlmap_t::iterator iter = cMap_req.begin(); iter != cMap_req.end(); ++iter) { for(s3fscurlmap_t::iterator iter = cMap_req.begin(); iter != cMap_req.end(); ++iter) {
pthread_t thread; pthread_t thread;
S3fsCurl* s3fscurl = (*iter).second; S3fsCurl* s3fscurl = (*iter).second;
int rc; int rc;
s3fscurl->sem = &sem;
sem.wait();
#ifndef __APPLE__
// macOS does not support pthread_tryjoin_np so we do not eagerly reap threads
for (std::vector<pthread_t>::iterator iter = threads.begin(); iter != threads.end(); ++iter) {
void* retval;
int rc;
rc = pthread_tryjoin_np(*iter, &retval);
if (rc == 0) {
iter = threads.erase(iter);
int int_retval = (int)(intptr_t)(retval);
if (int_retval && !(int_retval == ENOENT && isMultiHead)) {
S3FS_PRN_WARN("thread failed - rc(%d)", int_retval);
}
break;
} else if (rc == EBUSY) {
continue;
} else {
iter = threads.erase(iter);
success = false;
S3FS_PRN_ERR("failed pthread_tryjoin_np - rc(%d)", rc);
}
}
#endif
isMultiHead |= s3fscurl->GetOp() == "HEAD"; isMultiHead |= s3fscurl->GetOp() == "HEAD";
@ -3926,6 +3949,17 @@ int S3fsMultiCurl::MultiPerform(void)
threads.push_back(thread); threads.push_back(thread);
} }
for(int i = 0; i < S3fsCurl::max_parallel_cnt; ++i){
sem.wait();
}
#ifdef __APPLE__
// macOS cannot destroy a semaphore with posts less than the initializer
for(int i = 0; i < S3fsCurl::max_parallel_cnt; ++i){
sem.post();
}
#endif
for (std::vector<pthread_t>::iterator iter = threads.begin(); iter != threads.end(); ++iter) { for (std::vector<pthread_t>::iterator iter = threads.begin(); iter != threads.end(); ++iter) {
void* retval; void* retval;
int rc; int rc;
@ -4052,7 +4086,10 @@ int S3fsMultiCurl::Request(void)
// thread function for performing an S3fsCurl request // thread function for performing an S3fsCurl request
void* S3fsMultiCurl::RequestPerformWrapper(void* arg) { void* S3fsMultiCurl::RequestPerformWrapper(void* arg) {
return (void*)(intptr_t)(static_cast<S3fsCurl*>(arg)->RequestPerform()); S3fsCurl* s3fscurl = static_cast<S3fsCurl*>(arg);
void *result = (void*)(intptr_t)(s3fscurl->RequestPerform());
s3fscurl->sem->post();
return result;
} }
//------------------------------------------------------------------- //-------------------------------------------------------------------

View File

@ -23,6 +23,8 @@
#include <cassert> #include <cassert>
#include "psemaphore.h"
//---------------------------------------------- //----------------------------------------------
// Symbols // Symbols
//---------------------------------------------- //----------------------------------------------
@ -278,6 +280,7 @@ class S3fsCurl
sse_type_t b_ssetype; // backup for retrying sse_type_t b_ssetype; // backup for retrying
std::string op; // the HTTP verb of the request ("PUT", "GET", etc.) std::string op; // the HTTP verb of the request ("PUT", "GET", etc.)
std::string query_string; // request query string std::string query_string; // request query string
Semaphore *sem;
public: public:
// constructor/destructor // constructor/destructor

65
src/psemaphore.h Normal file
View File

@ -0,0 +1,65 @@
/*
* s3fs - FUSE-based file system backed by Amazon S3
*
* Copyright(C) 2007 Randy Rizun <rrizun@gmail.com>
*
* This program is free software; you can redistribute it and/or
* modify it under the terms of the GNU General Public License
* as published by the Free Software Foundation; either version 2
* of the License, or (at your option) any later version.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU General Public License for more details.
*
* You should have received a copy of the GNU General Public License
* along with this program; if not, write to the Free Software
* Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA.
*/
#ifndef S3FS_SEMAPHONE_H_
#define S3FS_SEMAPHONE_H_
// portability wrapper for sem_t since macOS does not implement it
#ifdef __APPLE__
#include <dispatch/dispatch.h>
class Semaphore
{
public:
explicit Semaphore(int value) : sem(dispatch_semaphore_create(value)) {}
~Semaphore() { dispatch_release(sem); }
void wait() { dispatch_semaphore_wait(sem, DISPATCH_TIME_FOREVER); }
void post() { dispatch_semaphore_signal(sem); }
private:
dispatch_semaphore_t sem;
};
#else
#include <errno.h>
#include <semaphore.h>
class Semaphore
{
public:
explicit Semaphore(int value) { sem_init(&mutex, 0, value); }
~Semaphore() { sem_destroy(&mutex); }
void wait()
{
int r;
do {
r = sem_wait(&mutex);
} while (r == -1 && errno == EINTR);
}
void post() { sem_post(&mutex); }
private:
sem_t mutex;
};
#endif
#endif // S3FS_SEMAPHONE_H_