diff --git a/src/curl.cpp b/src/curl.cpp index 64070ed..532539a 100644 --- a/src/curl.cpp +++ b/src/curl.cpp @@ -374,6 +374,7 @@ string S3fsCurl::curl_ca_bundle; mimes_t S3fsCurl::mimeTypes; string S3fsCurl::userAgent; int S3fsCurl::max_parallel_cnt = 5; // default +int S3fsCurl::max_multireq = 20; // default off_t S3fsCurl::multipart_size = MULTIPART_SIZE; // default bool S3fsCurl::is_sigv4 = true; // default bool S3fsCurl::is_ua = true; // default @@ -1259,6 +1260,13 @@ int S3fsCurl::SetMaxParallelCount(int value) return old; } +int S3fsCurl::SetMaxMultiRequest(int max) +{ + int old = S3fsCurl::max_multireq; + S3fsCurl::max_multireq = max; + return old; +} + bool S3fsCurl::UploadMultipartPostCallback(S3fsCurl* s3fscurl) { if(!s3fscurl){ @@ -1343,7 +1351,7 @@ int S3fsCurl::ParallelMultipartUploadRequest(const char* tpath, headers_t& meta, s3fscurl.DestroyCurlHandle(); // Initialize S3fsMultiCurl - S3fsMultiCurl curlmulti; + S3fsMultiCurl curlmulti(GetMaxParallelCount()); curlmulti.SetSuccessCallback(S3fsCurl::UploadMultipartPostCallback); curlmulti.SetRetryCallback(S3fsCurl::UploadMultipartPostRetryCallback); @@ -1433,7 +1441,7 @@ int S3fsCurl::ParallelGetObjectRequest(const char* tpath, int fd, off_t start, s // cycle through open fd, pulling off 10MB chunks at a time for(remaining_bytes = size; 0 < remaining_bytes; ){ - S3fsMultiCurl curlmulti; + S3fsMultiCurl curlmulti(GetMaxParallelCount()); int para_cnt; off_t chunk; @@ -3847,27 +3855,13 @@ int S3fsCurl::MultipartRenameRequest(const char* from, const char* to, headers_t return 0; } -//------------------------------------------------------------------- -// Class S3fsMultiCurl -//------------------------------------------------------------------- -static const int MAX_MULTI_HEADREQ = 20; // default: max request count in readdir curl_multi. - -//------------------------------------------------------------------- -// Class method for S3fsMultiCurl -//------------------------------------------------------------------- -int S3fsMultiCurl::max_multireq = MAX_MULTI_HEADREQ; - -int S3fsMultiCurl::SetMaxMultiRequest(int max) -{ - int old = S3fsMultiCurl::max_multireq; - S3fsMultiCurl::max_multireq= max; - return old; -} - //------------------------------------------------------------------- // method for S3fsMultiCurl //------------------------------------------------------------------- -S3fsMultiCurl::S3fsMultiCurl() : SuccessCallback(NULL), RetryCallback(NULL) +S3fsMultiCurl::S3fsMultiCurl(int maxParallelism) + : maxParallelism(maxParallelism) + , SuccessCallback(NULL) + , RetryCallback(NULL) { } @@ -3930,7 +3924,7 @@ int S3fsMultiCurl::MultiPerform(void) std::vector threads; bool success = true; bool isMultiHead = false; - Semaphore sem(S3fsCurl::max_parallel_cnt); + Semaphore sem(GetMaxParallelism()); int rc; for(s3fscurlmap_t::iterator iter = cMap_req.begin(); iter != cMap_req.end(); ++iter) { @@ -3975,17 +3969,10 @@ int S3fsMultiCurl::MultiPerform(void) threads.push_back(thread); } - for(int i = 0; i < S3fsCurl::max_parallel_cnt; ++i){ + for(int i = 0; i < sem.get_value(); ++i){ sem.wait(); } -#ifdef __APPLE__ - // macOS cannot destroy a semaphore with posts less than the initializer - for(int i = 0; i < S3fsCurl::max_parallel_cnt; ++i){ - sem.post(); - } -#endif - for (std::vector::iterator titer = threads.begin(); titer != threads.end(); ++titer) { void* retval; @@ -4082,9 +4069,8 @@ int S3fsMultiCurl::Request(void) while(!cMap_all.empty()){ // set curl handle to multi handle int result; - int cnt; s3fscurlmap_t::iterator iter; - for(cnt = 0, iter = cMap_all.begin(); cnt < S3fsMultiCurl::max_multireq && iter != cMap_all.end(); cMap_all.erase(iter++), cnt++){ + for(iter = cMap_all.begin(); iter != cMap_all.end(); cMap_all.erase(iter++)){ CURL* hCurl = (*iter).first; S3fsCurl* s3fscurl = (*iter).second; diff --git a/src/curl.h b/src/curl.h index f7796c2..3563cd8 100644 --- a/src/curl.h +++ b/src/curl.h @@ -249,6 +249,7 @@ class S3fsCurl static mimes_t mimeTypes; static std::string userAgent; static int max_parallel_cnt; + static int max_multireq; static off_t multipart_size; static bool is_sigv4; static bool is_ua; // User-Agent @@ -389,8 +390,12 @@ class S3fsCurl } static long SetSslVerifyHostname(long value); static long GetSslVerifyHostname(void) { return S3fsCurl::ssl_verify_hostname; } + // maximum parallel GET and PUT requests static int SetMaxParallelCount(int value); static int GetMaxParallelCount(void) { return S3fsCurl::max_parallel_cnt; } + // maximum parallel HEAD requests + static int SetMaxMultiRequest(int max); + static int GetMaxMultiRequest(void) { return S3fsCurl::max_multireq; } static bool SetIsECS(bool flag); static bool SetIsIBMIAMAuth(bool flag); static size_t SetIAMFieldCount(size_t field_count); @@ -470,7 +475,7 @@ typedef S3fsCurl* (*S3fsMultiRetryCallback)(S3fsCurl* s3fscurl); // callback for class S3fsMultiCurl { private: - static int max_multireq; + const int maxParallelism; s3fscurlmap_t cMap_all; // all of curl requests s3fscurlmap_t cMap_req; // curl requests are sent @@ -486,11 +491,10 @@ class S3fsMultiCurl static void* RequestPerformWrapper(void* arg); public: - S3fsMultiCurl(); + explicit S3fsMultiCurl(int maxParallelism); ~S3fsMultiCurl(); - static int SetMaxMultiRequest(int max); - static int GetMaxMultiRequest(void) { return S3fsMultiCurl::max_multireq; } + int GetMaxParallelism() { return maxParallelism; } S3fsMultiSuccessCallback SetSuccessCallback(S3fsMultiSuccessCallback function); S3fsMultiRetryCallback SetRetryCallback(S3fsMultiRetryCallback function); diff --git a/src/psemaphore.h b/src/psemaphore.h index 2988216..5206a4e 100644 --- a/src/psemaphore.h +++ b/src/psemaphore.h @@ -30,11 +30,19 @@ class Semaphore { public: - explicit Semaphore(int value) : sem(dispatch_semaphore_create(value)) {} - ~Semaphore() { dispatch_release(sem); } + explicit Semaphore(int value) : value(value), sem(dispatch_semaphore_create(value)) {} + ~Semaphore() { + // macOS cannot destroy a semaphore with posts less than the initializer + for(int i = 0; i < get_value(); ++i){ + post(); + } + dispatch_release(sem); + } void wait() { dispatch_semaphore_wait(sem, DISPATCH_TIME_FOREVER); } void post() { dispatch_semaphore_signal(sem); } + int get_value() const { return value; } private: + const int value; dispatch_semaphore_t sem; }; @@ -46,7 +54,7 @@ class Semaphore class Semaphore { public: - explicit Semaphore(int value) { sem_init(&mutex, 0, value); } + explicit Semaphore(int value) : value(value) { sem_init(&mutex, 0, value); } ~Semaphore() { sem_destroy(&mutex); } void wait() { @@ -56,7 +64,9 @@ class Semaphore } while (r == -1 && errno == EINTR); } void post() { sem_post(&mutex); } + int get_value() const { return value; } private: + const int value; sem_t mutex; }; diff --git a/src/s3fs.cpp b/src/s3fs.cpp index b99680e..673c8b1 100644 --- a/src/s3fs.cpp +++ b/src/s3fs.cpp @@ -2358,7 +2358,7 @@ static S3fsCurl* multi_head_retry_callback(S3fsCurl* s3fscurl) static int readdir_multi_head(const char* path, S3ObjList& head, void* buf, fuse_fill_dir_t filler) { - S3fsMultiCurl curlmulti; + S3fsMultiCurl curlmulti(S3fsCurl::GetMaxMultiRequest()); s3obj_list_t headlist; s3obj_list_t fillerlist; int result = 0; @@ -2372,75 +2372,69 @@ static int readdir_multi_head(const char* path, S3ObjList& head, void* buf, fuse curlmulti.SetSuccessCallback(multi_head_callback); curlmulti.SetRetryCallback(multi_head_retry_callback); - // Loop - while(!headlist.empty()){ - s3obj_list_t::iterator iter; - long cnt; + // TODO: deindent + s3obj_list_t::iterator iter; - fillerlist.clear(); - // Make single head request(with max). - for(iter = headlist.begin(), cnt = 0; headlist.end() != iter && cnt < S3fsMultiCurl::GetMaxMultiRequest(); iter = headlist.erase(iter)){ - string disppath = path + (*iter); - string etag = head.GetETag((*iter).c_str()); + fillerlist.clear(); + // Make single head request(with max). + for(iter = headlist.begin(); headlist.end() != iter; iter = headlist.erase(iter)){ + string disppath = path + (*iter); + string etag = head.GetETag((*iter).c_str()); - string fillpath = disppath; - if('/' == disppath[disppath.length() - 1]){ - fillpath = fillpath.substr(0, fillpath.length() -1); - } - fillerlist.push_back(fillpath); + string fillpath = disppath; + if('/' == disppath[disppath.length() - 1]){ + fillpath = fillpath.substr(0, fillpath.length() -1); + } + fillerlist.push_back(fillpath); - if(StatCache::getStatCacheData()->HasStat(disppath, etag.c_str())){ - continue; - } - - // First check for directory, start checking "not SSE-C". - // If checking failed, retry to check with "SSE-C" by retry callback func when SSE-C mode. - S3fsCurl* s3fscurl = new S3fsCurl(); - if(!s3fscurl->PreHeadRequest(disppath, (*iter), disppath)){ // target path = cache key path.(ex "dir/") - S3FS_PRN_WARN("Could not make curl object for head request(%s).", disppath.c_str()); - delete s3fscurl; - continue; - } - - if(!curlmulti.SetS3fsCurlObject(s3fscurl)){ - S3FS_PRN_WARN("Could not make curl object into multi curl(%s).", disppath.c_str()); - delete s3fscurl; - continue; - } - cnt++; // max request count within S3fsMultiCurl::GetMaxMultiRequest() + if(StatCache::getStatCacheData()->HasStat(disppath, etag.c_str())){ + continue; } - // Multi request - if(0 != (result = curlmulti.Request())){ - // If result is -EIO, it is something error occurred. - // This case includes that the object is encrypting(SSE) and s3fs does not have keys. - // So s3fs set result to 0 in order to continue the process. - if(-EIO == result){ - S3FS_PRN_WARN("error occurred in multi request(errno=%d), but continue...", result); - result = 0; - }else{ - S3FS_PRN_ERR("error occurred in multi request(errno=%d).", result); - break; - } + // First check for directory, start checking "not SSE-C". + // If checking failed, retry to check with "SSE-C" by retry callback func when SSE-C mode. + S3fsCurl* s3fscurl = new S3fsCurl(); + if(!s3fscurl->PreHeadRequest(disppath, (*iter), disppath)){ // target path = cache key path.(ex "dir/") + S3FS_PRN_WARN("Could not make curl object for head request(%s).", disppath.c_str()); + delete s3fscurl; + continue; } - // populate fuse buffer - // here is best position, because a case is cache size < files in directory - // - for(iter = fillerlist.begin(); fillerlist.end() != iter; ++iter){ - struct stat st; - string bpath = mybasename((*iter)); - if(StatCache::getStatCacheData()->GetStat((*iter), &st)){ - filler(buf, bpath.c_str(), &st, 0); - }else{ - S3FS_PRN_INFO2("Could not find %s file in stat cache.", (*iter).c_str()); - filler(buf, bpath.c_str(), 0, 0); - } + if(!curlmulti.SetS3fsCurlObject(s3fscurl)){ + S3FS_PRN_WARN("Could not make curl object into multi curl(%s).", disppath.c_str()); + delete s3fscurl; + continue; } - - // reinit for loop. - curlmulti.Clear(); } + + // Multi request + if(0 != (result = curlmulti.Request())){ + // If result is -EIO, it is something error occurred. + // This case includes that the object is encrypting(SSE) and s3fs does not have keys. + // So s3fs set result to 0 in order to continue the process. + if(-EIO == result){ + S3FS_PRN_WARN("error occurred in multi request(errno=%d), but continue...", result); + result = 0; + }else{ + S3FS_PRN_ERR("error occurred in multi request(errno=%d).", result); + return result; + } + } + + // populate fuse buffer + // here is best position, because a case is cache size < files in directory + // + for(iter = fillerlist.begin(); fillerlist.end() != iter; ++iter){ + struct stat st; + string bpath = mybasename((*iter)); + if(StatCache::getStatCacheData()->GetStat((*iter), &st)){ + filler(buf, bpath.c_str(), &st, 0); + }else{ + S3FS_PRN_INFO2("Could not find %s file in stat cache.", (*iter).c_str()); + filler(buf, bpath.c_str(), 0, 0); + } + } + return result; } @@ -4445,7 +4439,7 @@ static int my_fuse_opt_proc(void* data, const char* arg, int key, struct fuse_ar } if(0 == STR2NCMP(arg, "multireq_max=")){ long maxreq = static_cast(s3fs_strtoofft(strchr(arg, '=') + sizeof(char))); - S3fsMultiCurl::SetMaxMultiRequest(maxreq); + S3fsCurl::SetMaxMultiRequest(maxreq); return 0; } if(0 == strcmp(arg, "nonempty")){