switch S3fsMultiCurl to use foreground threads

This commit is contained in:
Or Ozeri 2017-03-29 10:13:05 +03:00
parent 43df94719b
commit 96764b7410
2 changed files with 82 additions and 152 deletions

View File

@ -3754,7 +3754,7 @@ int S3fsMultiCurl::SetMaxMultiRequest(int max)
//------------------------------------------------------------------- //-------------------------------------------------------------------
// method for S3fsMultiCurl // method for S3fsMultiCurl
//------------------------------------------------------------------- //-------------------------------------------------------------------
S3fsMultiCurl::S3fsMultiCurl() : hMulti(NULL), SuccessCallback(NULL), RetryCallback(NULL) S3fsMultiCurl::S3fsMultiCurl() : SuccessCallback(NULL), RetryCallback(NULL)
{ {
} }
@ -3767,22 +3767,13 @@ bool S3fsMultiCurl::ClearEx(bool is_all)
{ {
s3fscurlmap_t::iterator iter; s3fscurlmap_t::iterator iter;
for(iter = cMap_req.begin(); iter != cMap_req.end(); cMap_req.erase(iter++)){ for(iter = cMap_req.begin(); iter != cMap_req.end(); cMap_req.erase(iter++)){
CURL* hCurl = (*iter).first;
S3fsCurl* s3fscurl = (*iter).second; S3fsCurl* s3fscurl = (*iter).second;
if(hMulti && hCurl){
curl_multi_remove_handle(hMulti, hCurl);
}
if(s3fscurl){ if(s3fscurl){
s3fscurl->DestroyCurlHandle(); s3fscurl->DestroyCurlHandle();
delete s3fscurl; // with destroy curl handle. delete s3fscurl; // with destroy curl handle.
} }
} }
if(hMulti){
curl_multi_cleanup(hMulti);
hMulti = NULL;
}
if(is_all){ if(is_all){
for(iter = cMap_all.begin(); iter != cMap_all.end(); cMap_all.erase(iter++)){ for(iter = cMap_all.begin(); iter != cMap_all.end(); cMap_all.erase(iter++)){
S3fsCurl* s3fscurl = (*iter).second; S3fsCurl* s3fscurl = (*iter).second;
@ -3811,10 +3802,6 @@ S3fsMultiRetryCallback S3fsMultiCurl::SetRetryCallback(S3fsMultiRetryCallback fu
bool S3fsMultiCurl::SetS3fsCurlObject(S3fsCurl* s3fscurl) bool S3fsMultiCurl::SetS3fsCurlObject(S3fsCurl* s3fscurl)
{ {
if(hMulti){
S3FS_PRN_ERR("Internal error: hMulti is not null");
return false;
}
if(!s3fscurl){ if(!s3fscurl){
return false; return false;
} }
@ -3827,148 +3814,102 @@ bool S3fsMultiCurl::SetS3fsCurlObject(S3fsCurl* s3fscurl)
int S3fsMultiCurl::MultiPerform(void) int S3fsMultiCurl::MultiPerform(void)
{ {
CURLMcode curlm_code; std::vector<pthread_t> threads;
int still_running; bool success = true;
if(!hMulti){ for(s3fscurlmap_t::iterator iter = cMap_req.begin(); iter != cMap_req.end(); ++iter) {
return -1; pthread_t thread;
S3fsCurl* s3fscurl = (*iter).second;
int rc;
rc = pthread_create(&thread, NULL, S3fsMultiCurl::RequestPerformWrapper, static_cast<void*>(s3fscurl));
if (rc != 0) {
success = false;
S3FS_PRN_ERR("failed pthread_create - rc(%d)", rc);
break;
}
threads.push_back(thread);
} }
// Send multi request. for (std::vector<pthread_t>::iterator iter = threads.begin(); iter != threads.end(); ++iter) {
do{ void* retval;
// Start making requests and check running. int rc;
still_running = 0;
do {
curlm_code = curl_multi_perform(hMulti, &still_running);
} while(curlm_code == CURLM_CALL_MULTI_PERFORM);
if(curlm_code != CURLM_OK) { rc = pthread_join(*iter, &retval);
S3FS_PRN_DBG("curl_multi_perform code: %d msg: %s", curlm_code, curl_multi_strerror(curlm_code)); if (rc) {
} success = false;
S3FS_PRN_ERR("failed pthread_join - rc(%d)", rc);
// Set timer when still running } else {
if(still_running) { int int_retval = (int)(intptr_t)(retval);
long milliseconds; if (int_retval) {
fd_set r_fd; S3FS_PRN_ERR("thread failed - rc(%d)", int_retval);
fd_set w_fd; success = false;
fd_set e_fd;
FD_ZERO(&r_fd);
FD_ZERO(&w_fd);
FD_ZERO(&e_fd);
if(CURLM_OK != (curlm_code = curl_multi_timeout(hMulti, &milliseconds))){
S3FS_PRN_DBG("curl_multi_timeout code: %d msg: %s", curlm_code, curl_multi_strerror(curlm_code));
}
if(milliseconds < 0){
milliseconds = 50;
}
if(milliseconds > 0) {
int max_fd;
struct timeval timeout;
timeout.tv_sec = 1000 * milliseconds / 1000000;
timeout.tv_usec = 1000 * milliseconds % 1000000;
if(CURLM_OK != (curlm_code = curl_multi_fdset(hMulti, &r_fd, &w_fd, &e_fd, &max_fd))){
S3FS_PRN_ERR("curl_multi_fdset code: %d msg: %s", curlm_code, curl_multi_strerror(curlm_code));
return -EIO;
}
if(-1 == select(max_fd + 1, &r_fd, &w_fd, &e_fd, &timeout)){
S3FS_PRN_ERR("failed select - errno(%d)", errno);
return -errno;
}
} }
} }
}while(still_running); }
return 0; return success ? 0 : -EIO;
} }
int S3fsMultiCurl::MultiRead(void) int S3fsMultiCurl::MultiRead(void)
{ {
CURLMsg* msg; for(s3fscurlmap_t::iterator iter = cMap_req.begin(); iter != cMap_req.end(); cMap_req.erase(iter++)) {
int remaining_messages; S3fsCurl* s3fscurl = (*iter).second;
CURL* hCurl = NULL;
S3fsCurl* s3fscurl = NULL;
S3fsCurl* retrycurl= NULL;
while(NULL != (msg = curl_multi_info_read(hMulti, &remaining_messages))){ bool isRetry = false;
if(CURLMSG_DONE != msg->msg){
S3FS_PRN_ERR("curl_multi_info_read code: %d", msg->msg);
return -EIO;
}
hCurl = msg->easy_handle;
s3fscurlmap_t::iterator iter;
if(cMap_req.end() != (iter = cMap_req.find(hCurl))){
s3fscurl = iter->second;
}else{
s3fscurl = NULL;
}
retrycurl= NULL;
if(s3fscurl){ long responseCode = -1;
bool isRetry = false; if(s3fscurl->GetResponseCode(responseCode)){
if(CURLE_OK == msg->data.result){ if(400 > responseCode){
long responseCode = -1; // add into stat cache
if(s3fscurl->GetResponseCode(responseCode)){ if(SuccessCallback && !SuccessCallback(s3fscurl)){
if(400 > responseCode){ S3FS_PRN_WARN("error from callback function(%s).", s3fscurl->url.c_str());
// add into stat cache
if(SuccessCallback && !SuccessCallback(s3fscurl)){
S3FS_PRN_WARN("error from callback function(%s).", s3fscurl->url.c_str());
}
}else if(400 == responseCode){
// as possibly in multipart
S3FS_PRN_WARN("failed a request(%ld: %s)", responseCode, s3fscurl->url.c_str());
isRetry = true;
}else if(404 == responseCode){
// not found
S3FS_PRN_WARN("failed a request(%ld: %s)", responseCode, s3fscurl->url.c_str());
}else if(500 == responseCode){
// case of all other result, do retry.(11/13/2013)
// because it was found that s3fs got 500 error from S3, but could success
// to retry it.
S3FS_PRN_WARN("failed a request(%ld: %s)", responseCode, s3fscurl->url.c_str());
isRetry = true;
}else{
// Retry in other case.
S3FS_PRN_WARN("failed a request(%ld: %s)", responseCode, s3fscurl->url.c_str());
isRetry = true;
}
}else{
S3FS_PRN_ERR("failed a request(Unknown response code: %s)", s3fscurl->url.c_str());
} }
}else if(400 == responseCode){
// as possibly in multipart
S3FS_PRN_WARN("failed a request(%ld: %s)", responseCode, s3fscurl->url.c_str());
isRetry = true;
}else if(404 == responseCode){
// not found
S3FS_PRN_WARN("failed a request(%ld: %s)", responseCode, s3fscurl->url.c_str());
}else if(500 == responseCode){
// case of all other result, do retry.(11/13/2013)
// because it was found that s3fs got 500 error from S3, but could success
// to retry it.
S3FS_PRN_WARN("failed a request(%ld: %s)", responseCode, s3fscurl->url.c_str());
isRetry = true;
}else{ }else{
S3FS_PRN_WARN("failed to read(remaining: %d code: %d msg: %s), so retry this.", // Retry in other case.
remaining_messages, msg->data.result, curl_easy_strerror(msg->data.result)); S3FS_PRN_WARN("failed a request(%ld: %s)", responseCode, s3fscurl->url.c_str());
isRetry = true; isRetry = true;
} }
}else{
S3FS_PRN_ERR("failed a request(Unknown response code: %s)", s3fscurl->url.c_str());
}
if(!isRetry){
cMap_req.erase(hCurl);
curl_multi_remove_handle(hMulti, hCurl);
s3fscurl->DestroyCurlHandle(); if(!isRetry){
delete s3fscurl; s3fscurl->DestroyCurlHandle();
delete s3fscurl;
}else{ }else{
cMap_req.erase(hCurl); S3fsCurl* retrycurl = NULL;
curl_multi_remove_handle(hMulti, hCurl);
// For retry // For retry
if(RetryCallback){ if(RetryCallback){
if(NULL != (retrycurl = RetryCallback(s3fscurl))){ retrycurl = RetryCallback(s3fscurl);
cMap_all[retrycurl->hCurl] = retrycurl; if(NULL != retrycurl){
}else{ cMap_all[retrycurl->hCurl] = retrycurl;
// Could not set up callback. }else{
return -EIO; // Could not set up callback.
} return -EIO;
}
if(s3fscurl != retrycurl){
s3fscurl->DestroyCurlHandle();
delete s3fscurl;
} }
} }
}else{ if(s3fscurl != retrycurl){
assert(false); s3fscurl->DestroyCurlHandle();
delete s3fscurl;
}
} }
} }
return 0; return 0;
@ -3976,28 +3917,16 @@ int S3fsMultiCurl::MultiRead(void)
int S3fsMultiCurl::Request(void) int S3fsMultiCurl::Request(void)
{ {
int result; int result;
CURLMcode curlm_code;
S3FS_PRN_INFO3("[count=%zu]", cMap_all.size()); S3FS_PRN_INFO3("[count=%zu]", cMap_all.size());
if(hMulti){
S3FS_PRN_DBG("Warning: hMulti is not null, thus clear itself.");
ClearEx(false);
}
// Make request list. // Make request list.
// //
// Send multi request loop( with retry ) // Send multi request loop( with retry )
// (When many request is sends, sometimes gets "Couldn't connect to server") // (When many request is sends, sometimes gets "Couldn't connect to server")
// //
while(!cMap_all.empty()){ while(!cMap_all.empty()){
// populate the multi interface with an initial set of requests
if(NULL == (hMulti = curl_multi_init())){
Clear();
return -1;
}
// set curl handle to multi handle // set curl handle to multi handle
int cnt; int cnt;
s3fscurlmap_t::iterator iter; s3fscurlmap_t::iterator iter;
@ -4005,11 +3934,6 @@ int S3fsMultiCurl::Request(void)
CURL* hCurl = (*iter).first; CURL* hCurl = (*iter).first;
S3fsCurl* s3fscurl = (*iter).second; S3fsCurl* s3fscurl = (*iter).second;
if(CURLM_OK != (curlm_code = curl_multi_add_handle(hMulti, hCurl))){
S3FS_PRN_ERR("curl_multi_add_handle code: %d msg: %s", curlm_code, curl_multi_strerror(curlm_code));
Clear();
return -EIO;
}
cMap_req[hCurl] = s3fscurl; cMap_req[hCurl] = s3fscurl;
} }
@ -4031,6 +3955,11 @@ int S3fsMultiCurl::Request(void)
return 0; return 0;
} }
// thread function for performing an S3fsCurl request
void* S3fsMultiCurl::RequestPerformWrapper(void* arg) {
return (void*)(intptr_t)(static_cast<S3fsCurl*>(arg)->RequestPerform());
}
//------------------------------------------------------------------- //-------------------------------------------------------------------
// Utility functions // Utility functions
//------------------------------------------------------------------- //-------------------------------------------------------------------

View File

@ -441,7 +441,6 @@ class S3fsMultiCurl
private: private:
static int max_multireq; static int max_multireq;
CURLM* hMulti;
s3fscurlmap_t cMap_all; // all of curl requests s3fscurlmap_t cMap_all; // all of curl requests
s3fscurlmap_t cMap_req; // curl requests are sent s3fscurlmap_t cMap_req; // curl requests are sent
@ -453,6 +452,8 @@ class S3fsMultiCurl
int MultiPerform(void); int MultiPerform(void);
int MultiRead(void); int MultiRead(void);
static void* RequestPerformWrapper(void* arg);
public: public:
S3fsMultiCurl(); S3fsMultiCurl();
~S3fsMultiCurl(); ~S3fsMultiCurl();