From 88cd8feb053980c808d67771d63a84ca25f6db8a Mon Sep 17 00:00:00 2001 From: Andrew Gaul Date: Wed, 14 Nov 2018 16:48:57 -0800 Subject: [PATCH] Upload S3 parts without batching Previously s3fs would issue a batch of uploads and wait for all to succeed before issuing the next batch. Now it issues the first batch and only waits for a single part to succeed before uploading the next part. This can improve performance when one part lags due to network errors. Fixes #183. --- src/curl.cpp | 123 ++++++++++++++++++++++++++++++----------------- src/curl.h | 3 ++ src/psemaphore.h | 65 +++++++++++++++++++++++++ 3 files changed, 148 insertions(+), 43 deletions(-) create mode 100644 src/psemaphore.h diff --git a/src/curl.cpp b/src/curl.cpp index c1a80a4..e82a24a 100644 --- a/src/curl.cpp +++ b/src/curl.cpp @@ -50,6 +50,7 @@ #include "s3fs_util.h" #include "s3fs_auth.h" #include "addhead.h" +#include "psemaphore.h" using namespace std; @@ -1321,56 +1322,49 @@ int S3fsCurl::ParallelMultipartUploadRequest(const char* tpath, headers_t& meta, } s3fscurl.DestroyCurlHandle(); + // Initialize S3fsMultiCurl + S3fsMultiCurl curlmulti; + curlmulti.SetSuccessCallback(S3fsCurl::UploadMultipartPostCallback); + curlmulti.SetRetryCallback(S3fsCurl::UploadMultipartPostRetryCallback); + // cycle through open fd, pulling off 10MB chunks at a time for(remaining_bytes = st.st_size; 0 < remaining_bytes; ){ - S3fsMultiCurl curlmulti; - int para_cnt; - off_t chunk; + off_t chunk = remaining_bytes > S3fsCurl::multipart_size ? S3fsCurl::multipart_size : remaining_bytes; - // Initialize S3fsMultiCurl - curlmulti.SetSuccessCallback(S3fsCurl::UploadMultipartPostCallback); - curlmulti.SetRetryCallback(S3fsCurl::UploadMultipartPostRetryCallback); + // s3fscurl sub object + S3fsCurl* s3fscurl_para = new S3fsCurl(true); + s3fscurl_para->partdata.fd = fd2; + s3fscurl_para->partdata.startpos = st.st_size - remaining_bytes; + s3fscurl_para->partdata.size = chunk; + s3fscurl_para->b_partdata_startpos = s3fscurl_para->partdata.startpos; + s3fscurl_para->b_partdata_size = s3fscurl_para->partdata.size; + s3fscurl_para->partdata.add_etag_list(&list); - // Loop for setup parallel upload(multipart) request. - for(para_cnt = 0; para_cnt < S3fsCurl::max_parallel_cnt && 0 < remaining_bytes; para_cnt++, remaining_bytes -= chunk){ - // chunk size - chunk = remaining_bytes > S3fsCurl::multipart_size ? S3fsCurl::multipart_size : remaining_bytes; - - // s3fscurl sub object - S3fsCurl* s3fscurl_para = new S3fsCurl(true); - s3fscurl_para->partdata.fd = fd2; - s3fscurl_para->partdata.startpos = st.st_size - remaining_bytes; - s3fscurl_para->partdata.size = chunk; - s3fscurl_para->b_partdata_startpos = s3fscurl_para->partdata.startpos; - s3fscurl_para->b_partdata_size = s3fscurl_para->partdata.size; - s3fscurl_para->partdata.add_etag_list(&list); - - // initiate upload part for parallel - if(0 != (result = s3fscurl_para->UploadMultipartPostSetup(tpath, list.size(), upload_id))){ - S3FS_PRN_ERR("failed uploading part setup(%d)", result); - close(fd2); - delete s3fscurl_para; - return result; - } - - // set into parallel object - if(!curlmulti.SetS3fsCurlObject(s3fscurl_para)){ - S3FS_PRN_ERR("Could not make curl object into multi curl(%s).", tpath); - close(fd2); - delete s3fscurl_para; - return -1; - } + // initiate upload part for parallel + if(0 != (result = s3fscurl_para->UploadMultipartPostSetup(tpath, list.size(), upload_id))){ + S3FS_PRN_ERR("failed uploading part setup(%d)", result); + close(fd2); + delete s3fscurl_para; + return result; } - // Multi request - if(0 != (result = curlmulti.Request())){ - S3FS_PRN_ERR("error occurred in multi request(errno=%d).", result); - break; + // set into parallel object + if(!curlmulti.SetS3fsCurlObject(s3fscurl_para)){ + S3FS_PRN_ERR("Could not make curl object into multi curl(%s).", tpath); + close(fd2); + delete s3fscurl_para; + return -1; } - // reinit for loop. - curlmulti.Clear(); + remaining_bytes -= chunk; } + + // Multi request + if(0 != (result = curlmulti.Request())){ + S3FS_PRN_ERR("error occurred in multi request(errno=%d).", result); + return result; + } + close(fd2); if(0 != (result = s3fscurl.CompleteMultipartPostRequest(tpath, upload_id, list))){ @@ -1666,7 +1660,8 @@ S3fsCurl::S3fsCurl(bool ahbe) : hCurl(NULL), type(REQTYPE_UNSET), path(""), base_path(""), saved_path(""), url(""), requestHeaders(NULL), bodydata(NULL), headdata(NULL), LastResponseCode(-1), postdata(NULL), postdata_remaining(0), is_use_ahbe(ahbe), retry_count(0), b_infile(NULL), b_postdata(NULL), b_postdata_remaining(0), b_partdata_startpos(0), b_partdata_size(0), - b_ssekey_pos(-1), b_ssevalue(""), b_ssetype(SSE_DISABLE), op(""), query_string("") + b_ssekey_pos(-1), b_ssevalue(""), b_ssetype(SSE_DISABLE), op(""), query_string(""), + sem(NULL) { } @@ -3908,11 +3903,39 @@ int S3fsMultiCurl::MultiPerform(void) std::vector threads; bool success = true; bool isMultiHead = false; + Semaphore sem(S3fsCurl::max_parallel_cnt); for(s3fscurlmap_t::iterator iter = cMap_req.begin(); iter != cMap_req.end(); ++iter) { pthread_t thread; S3fsCurl* s3fscurl = (*iter).second; int rc; + s3fscurl->sem = &sem; + + sem.wait(); + +#ifndef __APPLE__ + // macOS does not support pthread_tryjoin_np so we do not eagerly reap threads + for (std::vector::iterator iter = threads.begin(); iter != threads.end(); ++iter) { + void* retval; + int rc; + + rc = pthread_tryjoin_np(*iter, &retval); + if (rc == 0) { + iter = threads.erase(iter); + int int_retval = (int)(intptr_t)(retval); + if (int_retval && !(int_retval == ENOENT && isMultiHead)) { + S3FS_PRN_WARN("thread failed - rc(%d)", int_retval); + } + break; + } else if (rc == EBUSY) { + continue; + } else { + iter = threads.erase(iter); + success = false; + S3FS_PRN_ERR("failed pthread_tryjoin_np - rc(%d)", rc); + } + } +#endif isMultiHead |= s3fscurl->GetOp() == "HEAD"; @@ -3926,6 +3949,17 @@ int S3fsMultiCurl::MultiPerform(void) threads.push_back(thread); } + for(int i = 0; i < S3fsCurl::max_parallel_cnt; ++i){ + sem.wait(); + } + +#ifdef __APPLE__ + // macOS cannot destroy a semaphore with posts less than the initializer + for(int i = 0; i < S3fsCurl::max_parallel_cnt; ++i){ + sem.post(); + } +#endif + for (std::vector::iterator iter = threads.begin(); iter != threads.end(); ++iter) { void* retval; int rc; @@ -4052,7 +4086,10 @@ int S3fsMultiCurl::Request(void) // thread function for performing an S3fsCurl request void* S3fsMultiCurl::RequestPerformWrapper(void* arg) { - return (void*)(intptr_t)(static_cast(arg)->RequestPerform()); + S3fsCurl* s3fscurl = static_cast(arg); + void *result = (void*)(intptr_t)(s3fscurl->RequestPerform()); + s3fscurl->sem->post(); + return result; } //------------------------------------------------------------------- diff --git a/src/curl.h b/src/curl.h index e18e253..f7796c2 100644 --- a/src/curl.h +++ b/src/curl.h @@ -23,6 +23,8 @@ #include +#include "psemaphore.h" + //---------------------------------------------- // Symbols //---------------------------------------------- @@ -278,6 +280,7 @@ class S3fsCurl sse_type_t b_ssetype; // backup for retrying std::string op; // the HTTP verb of the request ("PUT", "GET", etc.) std::string query_string; // request query string + Semaphore *sem; public: // constructor/destructor diff --git a/src/psemaphore.h b/src/psemaphore.h new file mode 100644 index 0000000..2988216 --- /dev/null +++ b/src/psemaphore.h @@ -0,0 +1,65 @@ +/* + * s3fs - FUSE-based file system backed by Amazon S3 + * + * Copyright(C) 2007 Randy Rizun + * + * This program is free software; you can redistribute it and/or + * modify it under the terms of the GNU General Public License + * as published by the Free Software Foundation; either version 2 + * of the License, or (at your option) any later version. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU General Public License for more details. + * + * You should have received a copy of the GNU General Public License + * along with this program; if not, write to the Free Software + * Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA. + */ + +#ifndef S3FS_SEMAPHONE_H_ +#define S3FS_SEMAPHONE_H_ + +// portability wrapper for sem_t since macOS does not implement it + +#ifdef __APPLE__ + +#include + +class Semaphore +{ + public: + explicit Semaphore(int value) : sem(dispatch_semaphore_create(value)) {} + ~Semaphore() { dispatch_release(sem); } + void wait() { dispatch_semaphore_wait(sem, DISPATCH_TIME_FOREVER); } + void post() { dispatch_semaphore_signal(sem); } + private: + dispatch_semaphore_t sem; +}; + +#else + +#include +#include + +class Semaphore +{ + public: + explicit Semaphore(int value) { sem_init(&mutex, 0, value); } + ~Semaphore() { sem_destroy(&mutex); } + void wait() + { + int r; + do { + r = sem_wait(&mutex); + } while (r == -1 && errno == EINTR); + } + void post() { sem_post(&mutex); } + private: + sem_t mutex; +}; + +#endif + +#endif // S3FS_SEMAPHONE_H_