Simplify async request completion code

Workers now notify the master thread when they complete, unifying the
Linux and macOS code paths.  This also avoids excessive
pthread_tryjoin_np calls.  Follows on to
88cd8feb05.
This commit is contained in:
Andrew Gaul 2019-01-28 12:14:04 -08:00
parent a442e843be
commit 8c527c3616
3 changed files with 35 additions and 21 deletions

View File

@ -1689,7 +1689,7 @@ S3fsCurl::S3fsCurl(bool ahbe) :
bodydata(NULL), headdata(NULL), LastResponseCode(-1), postdata(NULL), postdata_remaining(0), is_use_ahbe(ahbe),
retry_count(0), b_infile(NULL), b_postdata(NULL), b_postdata_remaining(0), b_partdata_startpos(0), b_partdata_size(0),
b_ssekey_pos(-1), b_ssevalue(""), b_ssetype(SSE_DISABLE), op(""), query_string(""),
sem(NULL)
sem(NULL), completed_tids_lock(NULL), completed_tids(NULL)
{
}
@ -3859,11 +3859,19 @@ S3fsMultiCurl::S3fsMultiCurl(int maxParallelism)
, SuccessCallback(NULL)
, RetryCallback(NULL)
{
int res;
if (0 != (res = pthread_mutex_init(&completed_tids_lock, NULL))) {
S3FS_PRN_ERR("could not initialize completed_tids_lock: %i", res);
}
}
S3fsMultiCurl::~S3fsMultiCurl()
{
Clear();
int res;
if(0 != (res = pthread_mutex_destroy(&completed_tids_lock))){
S3FS_PRN_ERR("could not destroy completed_tids_lock: %i", res);
}
}
bool S3fsMultiCurl::ClearEx(bool is_all)
@ -3927,31 +3935,29 @@ int S3fsMultiCurl::MultiPerform(void)
pthread_t thread;
S3fsCurl* s3fscurl = (*iter).second;
s3fscurl->sem = &sem;
s3fscurl->completed_tids_lock = &completed_tids_lock;
s3fscurl->completed_tids = &completed_tids;
sem.wait();
#ifndef __APPLE__
// macOS does not support pthread_tryjoin_np so we do not eagerly reap threads
for (std::vector<pthread_t>::iterator titer = threads.begin(); titer != threads.end(); ++titer) {
{
AutoLock lock(&completed_tids_lock);
for(std::vector<pthread_t>::iterator it = completed_tids.begin(); it != completed_tids.end(); ++it){
void* retval;
rc = pthread_tryjoin_np(*titer, &retval);
if (rc == 0) {
titer = threads.erase(titer);
rc = pthread_join(*it, &retval);
if (rc) {
success = false;
S3FS_PRN_ERR("failed pthread_join - rc(%d) %s", rc, strerror(rc));
} else {
int int_retval = (int)(intptr_t)(retval);
if (int_retval && !(int_retval == -ENOENT && isMultiHead)) {
S3FS_PRN_WARN("thread failed - rc(%d)", int_retval);
}
break;
} else if (rc == EBUSY) {
continue;
} else {
titer = threads.erase(titer);
success = false;
S3FS_PRN_ERR("failed pthread_tryjoin_np - rc(%d)", rc);
}
}
#endif
completed_tids.clear();
}
isMultiHead |= s3fscurl->GetOp() == "HEAD";
@ -3969,7 +3975,8 @@ int S3fsMultiCurl::MultiPerform(void)
sem.wait();
}
for (std::vector<pthread_t>::iterator titer = threads.begin(); titer != threads.end(); ++titer) {
AutoLock lock(&completed_tids_lock);
for (std::vector<pthread_t>::iterator titer = completed_tids.begin(); titer != completed_tids.end(); ++titer) {
void* retval;
rc = pthread_join(*titer, &retval);
@ -3983,6 +3990,7 @@ int S3fsMultiCurl::MultiPerform(void)
}
}
}
completed_tids.clear();
return success ? 0 : -EIO;
}
@ -4095,6 +4103,8 @@ int S3fsMultiCurl::Request(void)
void* S3fsMultiCurl::RequestPerformWrapper(void* arg) {
S3fsCurl* s3fscurl = static_cast<S3fsCurl*>(arg);
void *result = (void*)(intptr_t)(s3fscurl->RequestPerform());
AutoLock lock(s3fscurl->completed_tids_lock);
s3fscurl->completed_tids->push_back(pthread_self());
s3fscurl->sem->post();
return result;
}

View File

@ -282,6 +282,8 @@ class S3fsCurl
std::string op; // the HTTP verb of the request ("PUT", "GET", etc.)
std::string query_string; // request query string
Semaphore *sem;
pthread_mutex_t *completed_tids_lock;
std::vector<pthread_t> *completed_tids;
public:
// constructor/destructor
@ -483,6 +485,9 @@ class S3fsMultiCurl
S3fsMultiSuccessCallback SuccessCallback;
S3fsMultiRetryCallback RetryCallback;
pthread_mutex_t completed_tids_lock;
std::vector<pthread_t> completed_tids;
private:
bool ClearEx(bool is_all);
int MultiPerform(void);

View File

@ -2373,7 +2373,6 @@ static int readdir_multi_head(const char* path, S3ObjList& head, void* buf, fuse
curlmulti.SetSuccessCallback(multi_head_callback);
curlmulti.SetRetryCallback(multi_head_retry_callback);
// TODO: deindent
s3obj_list_t::iterator iter;
fillerlist.clear();