Merge pull request #552 from orozery/foreground_threads

switch S3fsMultiCurl to use foreground threads
This commit is contained in:
Takeshi Nakatani 2017-04-16 19:05:16 +09:00 committed by GitHub
commit 6bd179c92b
2 changed files with 82 additions and 152 deletions

View File

@ -3759,7 +3759,7 @@ int S3fsMultiCurl::SetMaxMultiRequest(int max)
//-------------------------------------------------------------------
// method for S3fsMultiCurl
//-------------------------------------------------------------------
S3fsMultiCurl::S3fsMultiCurl() : hMulti(NULL), SuccessCallback(NULL), RetryCallback(NULL)
S3fsMultiCurl::S3fsMultiCurl() : SuccessCallback(NULL), RetryCallback(NULL)
{
}
@ -3772,22 +3772,13 @@ bool S3fsMultiCurl::ClearEx(bool is_all)
{
s3fscurlmap_t::iterator iter;
for(iter = cMap_req.begin(); iter != cMap_req.end(); cMap_req.erase(iter++)){
CURL* hCurl = (*iter).first;
S3fsCurl* s3fscurl = (*iter).second;
if(hMulti && hCurl){
curl_multi_remove_handle(hMulti, hCurl);
}
if(s3fscurl){
s3fscurl->DestroyCurlHandle();
delete s3fscurl; // with destroy curl handle.
}
}
if(hMulti){
curl_multi_cleanup(hMulti);
hMulti = NULL;
}
if(is_all){
for(iter = cMap_all.begin(); iter != cMap_all.end(); cMap_all.erase(iter++)){
S3fsCurl* s3fscurl = (*iter).second;
@ -3816,10 +3807,6 @@ S3fsMultiRetryCallback S3fsMultiCurl::SetRetryCallback(S3fsMultiRetryCallback fu
bool S3fsMultiCurl::SetS3fsCurlObject(S3fsCurl* s3fscurl)
{
if(hMulti){
S3FS_PRN_ERR("Internal error: hMulti is not null");
return false;
}
if(!s3fscurl){
return false;
}
@ -3832,148 +3819,102 @@ bool S3fsMultiCurl::SetS3fsCurlObject(S3fsCurl* s3fscurl)
int S3fsMultiCurl::MultiPerform(void)
{
CURLMcode curlm_code;
int still_running;
std::vector<pthread_t> threads;
bool success = true;
if(!hMulti){
return -1;
for(s3fscurlmap_t::iterator iter = cMap_req.begin(); iter != cMap_req.end(); ++iter) {
pthread_t thread;
S3fsCurl* s3fscurl = (*iter).second;
int rc;
rc = pthread_create(&thread, NULL, S3fsMultiCurl::RequestPerformWrapper, static_cast<void*>(s3fscurl));
if (rc != 0) {
success = false;
S3FS_PRN_ERR("failed pthread_create - rc(%d)", rc);
break;
}
threads.push_back(thread);
}
// Send multi request.
do{
// Start making requests and check running.
still_running = 0;
do {
curlm_code = curl_multi_perform(hMulti, &still_running);
} while(curlm_code == CURLM_CALL_MULTI_PERFORM);
for (std::vector<pthread_t>::iterator iter = threads.begin(); iter != threads.end(); ++iter) {
void* retval;
int rc;
if(curlm_code != CURLM_OK) {
S3FS_PRN_DBG("curl_multi_perform code: %d msg: %s", curlm_code, curl_multi_strerror(curlm_code));
}
// Set timer when still running
if(still_running) {
long milliseconds;
fd_set r_fd;
fd_set w_fd;
fd_set e_fd;
FD_ZERO(&r_fd);
FD_ZERO(&w_fd);
FD_ZERO(&e_fd);
if(CURLM_OK != (curlm_code = curl_multi_timeout(hMulti, &milliseconds))){
S3FS_PRN_DBG("curl_multi_timeout code: %d msg: %s", curlm_code, curl_multi_strerror(curlm_code));
}
if(milliseconds < 0){
milliseconds = 50;
}
if(milliseconds > 0) {
int max_fd;
struct timeval timeout;
timeout.tv_sec = 1000 * milliseconds / 1000000;
timeout.tv_usec = 1000 * milliseconds % 1000000;
if(CURLM_OK != (curlm_code = curl_multi_fdset(hMulti, &r_fd, &w_fd, &e_fd, &max_fd))){
S3FS_PRN_ERR("curl_multi_fdset code: %d msg: %s", curlm_code, curl_multi_strerror(curlm_code));
return -EIO;
}
if(-1 == select(max_fd + 1, &r_fd, &w_fd, &e_fd, &timeout)){
S3FS_PRN_ERR("failed select - errno(%d)", errno);
return -errno;
}
rc = pthread_join(*iter, &retval);
if (rc) {
success = false;
S3FS_PRN_ERR("failed pthread_join - rc(%d)", rc);
} else {
int int_retval = (int)(intptr_t)(retval);
if (int_retval) {
S3FS_PRN_ERR("thread failed - rc(%d)", int_retval);
success = false;
}
}
}while(still_running);
}
return 0;
return success ? 0 : -EIO;
}
int S3fsMultiCurl::MultiRead(void)
{
CURLMsg* msg;
int remaining_messages;
CURL* hCurl = NULL;
S3fsCurl* s3fscurl = NULL;
S3fsCurl* retrycurl= NULL;
for(s3fscurlmap_t::iterator iter = cMap_req.begin(); iter != cMap_req.end(); cMap_req.erase(iter++)) {
S3fsCurl* s3fscurl = (*iter).second;
while(NULL != (msg = curl_multi_info_read(hMulti, &remaining_messages))){
if(CURLMSG_DONE != msg->msg){
S3FS_PRN_ERR("curl_multi_info_read code: %d", msg->msg);
return -EIO;
}
hCurl = msg->easy_handle;
s3fscurlmap_t::iterator iter;
if(cMap_req.end() != (iter = cMap_req.find(hCurl))){
s3fscurl = iter->second;
}else{
s3fscurl = NULL;
}
retrycurl= NULL;
bool isRetry = false;
if(s3fscurl){
bool isRetry = false;
if(CURLE_OK == msg->data.result){
long responseCode = -1;
if(s3fscurl->GetResponseCode(responseCode)){
if(400 > responseCode){
// add into stat cache
if(SuccessCallback && !SuccessCallback(s3fscurl)){
S3FS_PRN_WARN("error from callback function(%s).", s3fscurl->url.c_str());
}
}else if(400 == responseCode){
// as possibly in multipart
S3FS_PRN_WARN("failed a request(%ld: %s)", responseCode, s3fscurl->url.c_str());
isRetry = true;
}else if(404 == responseCode){
// not found
S3FS_PRN_WARN("failed a request(%ld: %s)", responseCode, s3fscurl->url.c_str());
}else if(500 == responseCode){
// case of all other result, do retry.(11/13/2013)
// because it was found that s3fs got 500 error from S3, but could success
// to retry it.
S3FS_PRN_WARN("failed a request(%ld: %s)", responseCode, s3fscurl->url.c_str());
isRetry = true;
}else{
// Retry in other case.
S3FS_PRN_WARN("failed a request(%ld: %s)", responseCode, s3fscurl->url.c_str());
isRetry = true;
}
}else{
S3FS_PRN_ERR("failed a request(Unknown response code: %s)", s3fscurl->url.c_str());
long responseCode = -1;
if(s3fscurl->GetResponseCode(responseCode)){
if(400 > responseCode){
// add into stat cache
if(SuccessCallback && !SuccessCallback(s3fscurl)){
S3FS_PRN_WARN("error from callback function(%s).", s3fscurl->url.c_str());
}
}else if(400 == responseCode){
// as possibly in multipart
S3FS_PRN_WARN("failed a request(%ld: %s)", responseCode, s3fscurl->url.c_str());
isRetry = true;
}else if(404 == responseCode){
// not found
S3FS_PRN_WARN("failed a request(%ld: %s)", responseCode, s3fscurl->url.c_str());
}else if(500 == responseCode){
// case of all other result, do retry.(11/13/2013)
// because it was found that s3fs got 500 error from S3, but could success
// to retry it.
S3FS_PRN_WARN("failed a request(%ld: %s)", responseCode, s3fscurl->url.c_str());
isRetry = true;
}else{
S3FS_PRN_WARN("failed to read(remaining: %d code: %d msg: %s), so retry this.",
remaining_messages, msg->data.result, curl_easy_strerror(msg->data.result));
// Retry in other case.
S3FS_PRN_WARN("failed a request(%ld: %s)", responseCode, s3fscurl->url.c_str());
isRetry = true;
}
}else{
S3FS_PRN_ERR("failed a request(Unknown response code: %s)", s3fscurl->url.c_str());
}
if(!isRetry){
cMap_req.erase(hCurl);
curl_multi_remove_handle(hMulti, hCurl);
s3fscurl->DestroyCurlHandle();
delete s3fscurl;
if(!isRetry){
s3fscurl->DestroyCurlHandle();
delete s3fscurl;
}else{
cMap_req.erase(hCurl);
curl_multi_remove_handle(hMulti, hCurl);
}else{
S3fsCurl* retrycurl = NULL;
// For retry
if(RetryCallback){
if(NULL != (retrycurl = RetryCallback(s3fscurl))){
cMap_all[retrycurl->hCurl] = retrycurl;
}else{
// Could not set up callback.
return -EIO;
}
}
if(s3fscurl != retrycurl){
s3fscurl->DestroyCurlHandle();
delete s3fscurl;
// For retry
if(RetryCallback){
retrycurl = RetryCallback(s3fscurl);
if(NULL != retrycurl){
cMap_all[retrycurl->hCurl] = retrycurl;
}else{
// Could not set up callback.
return -EIO;
}
}
}else{
assert(false);
if(s3fscurl != retrycurl){
s3fscurl->DestroyCurlHandle();
delete s3fscurl;
}
}
}
return 0;
@ -3981,28 +3922,16 @@ int S3fsMultiCurl::MultiRead(void)
int S3fsMultiCurl::Request(void)
{
int result;
CURLMcode curlm_code;
int result;
S3FS_PRN_INFO3("[count=%zu]", cMap_all.size());
if(hMulti){
S3FS_PRN_DBG("Warning: hMulti is not null, thus clear itself.");
ClearEx(false);
}
// Make request list.
//
// Send multi request loop( with retry )
// (When many request is sends, sometimes gets "Couldn't connect to server")
//
while(!cMap_all.empty()){
// populate the multi interface with an initial set of requests
if(NULL == (hMulti = curl_multi_init())){
Clear();
return -1;
}
// set curl handle to multi handle
int cnt;
s3fscurlmap_t::iterator iter;
@ -4010,11 +3939,6 @@ int S3fsMultiCurl::Request(void)
CURL* hCurl = (*iter).first;
S3fsCurl* s3fscurl = (*iter).second;
if(CURLM_OK != (curlm_code = curl_multi_add_handle(hMulti, hCurl))){
S3FS_PRN_ERR("curl_multi_add_handle code: %d msg: %s", curlm_code, curl_multi_strerror(curlm_code));
Clear();
return -EIO;
}
cMap_req[hCurl] = s3fscurl;
}
@ -4036,6 +3960,11 @@ int S3fsMultiCurl::Request(void)
return 0;
}
// thread function for performing an S3fsCurl request
void* S3fsMultiCurl::RequestPerformWrapper(void* arg) {
return (void*)(intptr_t)(static_cast<S3fsCurl*>(arg)->RequestPerform());
}
//-------------------------------------------------------------------
// Utility functions
//-------------------------------------------------------------------

View File

@ -441,7 +441,6 @@ class S3fsMultiCurl
private:
static int max_multireq;
CURLM* hMulti;
s3fscurlmap_t cMap_all; // all of curl requests
s3fscurlmap_t cMap_req; // curl requests are sent
@ -453,6 +452,8 @@ class S3fsMultiCurl
int MultiPerform(void);
int MultiRead(void);
static void* RequestPerformWrapper(void* arg);
public:
S3fsMultiCurl();
~S3fsMultiCurl();