From 96764b7410ed151d77a2d262d85d217de67fb3eb Mon Sep 17 00:00:00 2001 From: Or Ozeri Date: Wed, 29 Mar 2017 10:13:05 +0300 Subject: [PATCH] switch S3fsMultiCurl to use foreground threads --- src/curl.cpp | 231 ++++++++++++++++++--------------------------------- src/curl.h | 3 +- 2 files changed, 82 insertions(+), 152 deletions(-) diff --git a/src/curl.cpp b/src/curl.cpp index 7d61341..786603f 100644 --- a/src/curl.cpp +++ b/src/curl.cpp @@ -3754,7 +3754,7 @@ int S3fsMultiCurl::SetMaxMultiRequest(int max) //------------------------------------------------------------------- // method for S3fsMultiCurl //------------------------------------------------------------------- -S3fsMultiCurl::S3fsMultiCurl() : hMulti(NULL), SuccessCallback(NULL), RetryCallback(NULL) +S3fsMultiCurl::S3fsMultiCurl() : SuccessCallback(NULL), RetryCallback(NULL) { } @@ -3767,22 +3767,13 @@ bool S3fsMultiCurl::ClearEx(bool is_all) { s3fscurlmap_t::iterator iter; for(iter = cMap_req.begin(); iter != cMap_req.end(); cMap_req.erase(iter++)){ - CURL* hCurl = (*iter).first; S3fsCurl* s3fscurl = (*iter).second; - if(hMulti && hCurl){ - curl_multi_remove_handle(hMulti, hCurl); - } if(s3fscurl){ s3fscurl->DestroyCurlHandle(); delete s3fscurl; // with destroy curl handle. } } - if(hMulti){ - curl_multi_cleanup(hMulti); - hMulti = NULL; - } - if(is_all){ for(iter = cMap_all.begin(); iter != cMap_all.end(); cMap_all.erase(iter++)){ S3fsCurl* s3fscurl = (*iter).second; @@ -3811,10 +3802,6 @@ S3fsMultiRetryCallback S3fsMultiCurl::SetRetryCallback(S3fsMultiRetryCallback fu bool S3fsMultiCurl::SetS3fsCurlObject(S3fsCurl* s3fscurl) { - if(hMulti){ - S3FS_PRN_ERR("Internal error: hMulti is not null"); - return false; - } if(!s3fscurl){ return false; } @@ -3827,148 +3814,102 @@ bool S3fsMultiCurl::SetS3fsCurlObject(S3fsCurl* s3fscurl) int S3fsMultiCurl::MultiPerform(void) { - CURLMcode curlm_code; - int still_running; + std::vector threads; + bool success = true; - if(!hMulti){ - return -1; + for(s3fscurlmap_t::iterator iter = cMap_req.begin(); iter != cMap_req.end(); ++iter) { + pthread_t thread; + S3fsCurl* s3fscurl = (*iter).second; + int rc; + + rc = pthread_create(&thread, NULL, S3fsMultiCurl::RequestPerformWrapper, static_cast(s3fscurl)); + if (rc != 0) { + success = false; + S3FS_PRN_ERR("failed pthread_create - rc(%d)", rc); + break; + } + + threads.push_back(thread); } - // Send multi request. - do{ - // Start making requests and check running. - still_running = 0; - do { - curlm_code = curl_multi_perform(hMulti, &still_running); - } while(curlm_code == CURLM_CALL_MULTI_PERFORM); + for (std::vector::iterator iter = threads.begin(); iter != threads.end(); ++iter) { + void* retval; + int rc; - if(curlm_code != CURLM_OK) { - S3FS_PRN_DBG("curl_multi_perform code: %d msg: %s", curlm_code, curl_multi_strerror(curlm_code)); - } - - // Set timer when still running - if(still_running) { - long milliseconds; - fd_set r_fd; - fd_set w_fd; - fd_set e_fd; - FD_ZERO(&r_fd); - FD_ZERO(&w_fd); - FD_ZERO(&e_fd); - - if(CURLM_OK != (curlm_code = curl_multi_timeout(hMulti, &milliseconds))){ - S3FS_PRN_DBG("curl_multi_timeout code: %d msg: %s", curlm_code, curl_multi_strerror(curlm_code)); - } - if(milliseconds < 0){ - milliseconds = 50; - } - if(milliseconds > 0) { - int max_fd; - struct timeval timeout; - timeout.tv_sec = 1000 * milliseconds / 1000000; - timeout.tv_usec = 1000 * milliseconds % 1000000; - - if(CURLM_OK != (curlm_code = curl_multi_fdset(hMulti, &r_fd, &w_fd, &e_fd, &max_fd))){ - S3FS_PRN_ERR("curl_multi_fdset code: %d msg: %s", curlm_code, curl_multi_strerror(curlm_code)); - return -EIO; - } - if(-1 == select(max_fd + 1, &r_fd, &w_fd, &e_fd, &timeout)){ - S3FS_PRN_ERR("failed select - errno(%d)", errno); - return -errno; - } + rc = pthread_join(*iter, &retval); + if (rc) { + success = false; + S3FS_PRN_ERR("failed pthread_join - rc(%d)", rc); + } else { + int int_retval = (int)(intptr_t)(retval); + if (int_retval) { + S3FS_PRN_ERR("thread failed - rc(%d)", int_retval); + success = false; } } - }while(still_running); + } - return 0; + return success ? 0 : -EIO; } int S3fsMultiCurl::MultiRead(void) { - CURLMsg* msg; - int remaining_messages; - CURL* hCurl = NULL; - S3fsCurl* s3fscurl = NULL; - S3fsCurl* retrycurl= NULL; + for(s3fscurlmap_t::iterator iter = cMap_req.begin(); iter != cMap_req.end(); cMap_req.erase(iter++)) { + S3fsCurl* s3fscurl = (*iter).second; - while(NULL != (msg = curl_multi_info_read(hMulti, &remaining_messages))){ - if(CURLMSG_DONE != msg->msg){ - S3FS_PRN_ERR("curl_multi_info_read code: %d", msg->msg); - return -EIO; - } - hCurl = msg->easy_handle; - s3fscurlmap_t::iterator iter; - if(cMap_req.end() != (iter = cMap_req.find(hCurl))){ - s3fscurl = iter->second; - }else{ - s3fscurl = NULL; - } - retrycurl= NULL; + bool isRetry = false; - if(s3fscurl){ - bool isRetry = false; - if(CURLE_OK == msg->data.result){ - long responseCode = -1; - if(s3fscurl->GetResponseCode(responseCode)){ - if(400 > responseCode){ - // add into stat cache - if(SuccessCallback && !SuccessCallback(s3fscurl)){ - S3FS_PRN_WARN("error from callback function(%s).", s3fscurl->url.c_str()); - } - }else if(400 == responseCode){ - // as possibly in multipart - S3FS_PRN_WARN("failed a request(%ld: %s)", responseCode, s3fscurl->url.c_str()); - isRetry = true; - }else if(404 == responseCode){ - // not found - S3FS_PRN_WARN("failed a request(%ld: %s)", responseCode, s3fscurl->url.c_str()); - }else if(500 == responseCode){ - // case of all other result, do retry.(11/13/2013) - // because it was found that s3fs got 500 error from S3, but could success - // to retry it. - S3FS_PRN_WARN("failed a request(%ld: %s)", responseCode, s3fscurl->url.c_str()); - isRetry = true; - }else{ - // Retry in other case. - S3FS_PRN_WARN("failed a request(%ld: %s)", responseCode, s3fscurl->url.c_str()); - isRetry = true; - } - }else{ - S3FS_PRN_ERR("failed a request(Unknown response code: %s)", s3fscurl->url.c_str()); + long responseCode = -1; + if(s3fscurl->GetResponseCode(responseCode)){ + if(400 > responseCode){ + // add into stat cache + if(SuccessCallback && !SuccessCallback(s3fscurl)){ + S3FS_PRN_WARN("error from callback function(%s).", s3fscurl->url.c_str()); } + }else if(400 == responseCode){ + // as possibly in multipart + S3FS_PRN_WARN("failed a request(%ld: %s)", responseCode, s3fscurl->url.c_str()); + isRetry = true; + }else if(404 == responseCode){ + // not found + S3FS_PRN_WARN("failed a request(%ld: %s)", responseCode, s3fscurl->url.c_str()); + }else if(500 == responseCode){ + // case of all other result, do retry.(11/13/2013) + // because it was found that s3fs got 500 error from S3, but could success + // to retry it. + S3FS_PRN_WARN("failed a request(%ld: %s)", responseCode, s3fscurl->url.c_str()); + isRetry = true; }else{ - S3FS_PRN_WARN("failed to read(remaining: %d code: %d msg: %s), so retry this.", - remaining_messages, msg->data.result, curl_easy_strerror(msg->data.result)); + // Retry in other case. + S3FS_PRN_WARN("failed a request(%ld: %s)", responseCode, s3fscurl->url.c_str()); isRetry = true; } + }else{ + S3FS_PRN_ERR("failed a request(Unknown response code: %s)", s3fscurl->url.c_str()); + } - if(!isRetry){ - cMap_req.erase(hCurl); - curl_multi_remove_handle(hMulti, hCurl); - s3fscurl->DestroyCurlHandle(); - delete s3fscurl; + if(!isRetry){ + s3fscurl->DestroyCurlHandle(); + delete s3fscurl; - }else{ - cMap_req.erase(hCurl); - curl_multi_remove_handle(hMulti, hCurl); + }else{ + S3fsCurl* retrycurl = NULL; - // For retry - if(RetryCallback){ - if(NULL != (retrycurl = RetryCallback(s3fscurl))){ - cMap_all[retrycurl->hCurl] = retrycurl; - }else{ - // Could not set up callback. - return -EIO; - } - } - if(s3fscurl != retrycurl){ - s3fscurl->DestroyCurlHandle(); - delete s3fscurl; + // For retry + if(RetryCallback){ + retrycurl = RetryCallback(s3fscurl); + if(NULL != retrycurl){ + cMap_all[retrycurl->hCurl] = retrycurl; + }else{ + // Could not set up callback. + return -EIO; } } - }else{ - assert(false); + if(s3fscurl != retrycurl){ + s3fscurl->DestroyCurlHandle(); + delete s3fscurl; + } } } return 0; @@ -3976,28 +3917,16 @@ int S3fsMultiCurl::MultiRead(void) int S3fsMultiCurl::Request(void) { - int result; - CURLMcode curlm_code; + int result; S3FS_PRN_INFO3("[count=%zu]", cMap_all.size()); - if(hMulti){ - S3FS_PRN_DBG("Warning: hMulti is not null, thus clear itself."); - ClearEx(false); - } - // Make request list. // // Send multi request loop( with retry ) // (When many request is sends, sometimes gets "Couldn't connect to server") // while(!cMap_all.empty()){ - // populate the multi interface with an initial set of requests - if(NULL == (hMulti = curl_multi_init())){ - Clear(); - return -1; - } - // set curl handle to multi handle int cnt; s3fscurlmap_t::iterator iter; @@ -4005,11 +3934,6 @@ int S3fsMultiCurl::Request(void) CURL* hCurl = (*iter).first; S3fsCurl* s3fscurl = (*iter).second; - if(CURLM_OK != (curlm_code = curl_multi_add_handle(hMulti, hCurl))){ - S3FS_PRN_ERR("curl_multi_add_handle code: %d msg: %s", curlm_code, curl_multi_strerror(curlm_code)); - Clear(); - return -EIO; - } cMap_req[hCurl] = s3fscurl; } @@ -4031,6 +3955,11 @@ int S3fsMultiCurl::Request(void) return 0; } +// thread function for performing an S3fsCurl request +void* S3fsMultiCurl::RequestPerformWrapper(void* arg) { + return (void*)(intptr_t)(static_cast(arg)->RequestPerform()); +} + //------------------------------------------------------------------- // Utility functions //------------------------------------------------------------------- diff --git a/src/curl.h b/src/curl.h index 4fa3726..4e99864 100644 --- a/src/curl.h +++ b/src/curl.h @@ -441,7 +441,6 @@ class S3fsMultiCurl private: static int max_multireq; - CURLM* hMulti; s3fscurlmap_t cMap_all; // all of curl requests s3fscurlmap_t cMap_req; // curl requests are sent @@ -453,6 +452,8 @@ class S3fsMultiCurl int MultiPerform(void); int MultiRead(void); + static void* RequestPerformWrapper(void* arg); + public: S3fsMultiCurl(); ~S3fsMultiCurl();