mirror of
https://github.com/s3fs-fuse/s3fs-fuse.git
synced 2025-04-02 14:41:52 +00:00
Merge pull request #946 from gaul/async/completed-tids
Simplify async request completion code
This commit is contained in:
commit
d8185a25aa
50
src/curl.cpp
50
src/curl.cpp
@ -1728,7 +1728,7 @@ S3fsCurl::S3fsCurl(bool ahbe) :
|
|||||||
bodydata(NULL), headdata(NULL), LastResponseCode(-1), postdata(NULL), postdata_remaining(0), is_use_ahbe(ahbe),
|
bodydata(NULL), headdata(NULL), LastResponseCode(-1), postdata(NULL), postdata_remaining(0), is_use_ahbe(ahbe),
|
||||||
retry_count(0), b_infile(NULL), b_postdata(NULL), b_postdata_remaining(0), b_partdata_startpos(0), b_partdata_size(0),
|
retry_count(0), b_infile(NULL), b_postdata(NULL), b_postdata_remaining(0), b_partdata_startpos(0), b_partdata_size(0),
|
||||||
b_ssekey_pos(-1), b_ssevalue(""), b_ssetype(SSE_DISABLE), op(""), query_string(""),
|
b_ssekey_pos(-1), b_ssevalue(""), b_ssetype(SSE_DISABLE), op(""), query_string(""),
|
||||||
sem(NULL)
|
sem(NULL), completed_tids_lock(NULL), completed_tids(NULL)
|
||||||
{
|
{
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -3934,11 +3934,19 @@ S3fsMultiCurl::S3fsMultiCurl(int maxParallelism)
|
|||||||
, SuccessCallback(NULL)
|
, SuccessCallback(NULL)
|
||||||
, RetryCallback(NULL)
|
, RetryCallback(NULL)
|
||||||
{
|
{
|
||||||
|
int res;
|
||||||
|
if (0 != (res = pthread_mutex_init(&completed_tids_lock, NULL))) {
|
||||||
|
S3FS_PRN_ERR("could not initialize completed_tids_lock: %i", res);
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
S3fsMultiCurl::~S3fsMultiCurl()
|
S3fsMultiCurl::~S3fsMultiCurl()
|
||||||
{
|
{
|
||||||
Clear();
|
Clear();
|
||||||
|
int res;
|
||||||
|
if(0 != (res = pthread_mutex_destroy(&completed_tids_lock))){
|
||||||
|
S3FS_PRN_ERR("could not destroy completed_tids_lock: %i", res);
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
bool S3fsMultiCurl::ClearEx(bool is_all)
|
bool S3fsMultiCurl::ClearEx(bool is_all)
|
||||||
@ -4002,31 +4010,29 @@ int S3fsMultiCurl::MultiPerform()
|
|||||||
pthread_t thread;
|
pthread_t thread;
|
||||||
S3fsCurl* s3fscurl = (*iter).second;
|
S3fsCurl* s3fscurl = (*iter).second;
|
||||||
s3fscurl->sem = &sem;
|
s3fscurl->sem = &sem;
|
||||||
|
s3fscurl->completed_tids_lock = &completed_tids_lock;
|
||||||
|
s3fscurl->completed_tids = &completed_tids;
|
||||||
|
|
||||||
sem.wait();
|
sem.wait();
|
||||||
|
|
||||||
#ifndef __APPLE__
|
{
|
||||||
// macOS does not support pthread_tryjoin_np so we do not eagerly reap threads
|
AutoLock lock(&completed_tids_lock);
|
||||||
for (std::vector<pthread_t>::iterator titer = threads.begin(); titer != threads.end(); ++titer) {
|
for(std::vector<pthread_t>::iterator it = completed_tids.begin(); it != completed_tids.end(); ++it){
|
||||||
void* retval;
|
void* retval;
|
||||||
|
|
||||||
rc = pthread_tryjoin_np(*titer, &retval);
|
rc = pthread_join(*it, &retval);
|
||||||
if (rc == 0) {
|
if (rc) {
|
||||||
titer = threads.erase(titer);
|
success = false;
|
||||||
int int_retval = (int)(intptr_t)(retval);
|
S3FS_PRN_ERR("failed pthread_join - rc(%d) %s", rc, strerror(rc));
|
||||||
if (int_retval && !(int_retval == -ENOENT && isMultiHead)) {
|
} else {
|
||||||
S3FS_PRN_WARN("thread failed - rc(%d)", int_retval);
|
int int_retval = (int)(intptr_t)(retval);
|
||||||
|
if (int_retval && !(int_retval == -ENOENT && isMultiHead)) {
|
||||||
|
S3FS_PRN_WARN("thread failed - rc(%d)", int_retval);
|
||||||
|
}
|
||||||
}
|
}
|
||||||
break;
|
|
||||||
} else if (rc == EBUSY) {
|
|
||||||
continue;
|
|
||||||
} else {
|
|
||||||
titer = threads.erase(titer);
|
|
||||||
success = false;
|
|
||||||
S3FS_PRN_ERR("failed pthread_tryjoin_np - rc(%d)", rc);
|
|
||||||
}
|
}
|
||||||
|
completed_tids.clear();
|
||||||
}
|
}
|
||||||
#endif
|
|
||||||
|
|
||||||
isMultiHead |= s3fscurl->GetOp() == "HEAD";
|
isMultiHead |= s3fscurl->GetOp() == "HEAD";
|
||||||
|
|
||||||
@ -4044,7 +4050,8 @@ int S3fsMultiCurl::MultiPerform()
|
|||||||
sem.wait();
|
sem.wait();
|
||||||
}
|
}
|
||||||
|
|
||||||
for (std::vector<pthread_t>::iterator titer = threads.begin(); titer != threads.end(); ++titer) {
|
AutoLock lock(&completed_tids_lock);
|
||||||
|
for (std::vector<pthread_t>::iterator titer = completed_tids.begin(); titer != completed_tids.end(); ++titer) {
|
||||||
void* retval;
|
void* retval;
|
||||||
|
|
||||||
rc = pthread_join(*titer, &retval);
|
rc = pthread_join(*titer, &retval);
|
||||||
@ -4058,6 +4065,7 @@ int S3fsMultiCurl::MultiPerform()
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
completed_tids.clear();
|
||||||
|
|
||||||
return success ? 0 : -EIO;
|
return success ? 0 : -EIO;
|
||||||
}
|
}
|
||||||
@ -4170,6 +4178,8 @@ int S3fsMultiCurl::Request()
|
|||||||
void* S3fsMultiCurl::RequestPerformWrapper(void* arg) {
|
void* S3fsMultiCurl::RequestPerformWrapper(void* arg) {
|
||||||
S3fsCurl* s3fscurl = static_cast<S3fsCurl*>(arg);
|
S3fsCurl* s3fscurl = static_cast<S3fsCurl*>(arg);
|
||||||
void *result = (void*)(intptr_t)(s3fscurl->RequestPerform());
|
void *result = (void*)(intptr_t)(s3fscurl->RequestPerform());
|
||||||
|
AutoLock lock(s3fscurl->completed_tids_lock);
|
||||||
|
s3fscurl->completed_tids->push_back(pthread_self());
|
||||||
s3fscurl->sem->post();
|
s3fscurl->sem->post();
|
||||||
return result;
|
return result;
|
||||||
}
|
}
|
||||||
|
@ -282,6 +282,8 @@ class S3fsCurl
|
|||||||
std::string op; // the HTTP verb of the request ("PUT", "GET", etc.)
|
std::string op; // the HTTP verb of the request ("PUT", "GET", etc.)
|
||||||
std::string query_string; // request query string
|
std::string query_string; // request query string
|
||||||
Semaphore *sem;
|
Semaphore *sem;
|
||||||
|
pthread_mutex_t *completed_tids_lock;
|
||||||
|
std::vector<pthread_t> *completed_tids;
|
||||||
|
|
||||||
public:
|
public:
|
||||||
// constructor/destructor
|
// constructor/destructor
|
||||||
@ -486,6 +488,9 @@ class S3fsMultiCurl
|
|||||||
S3fsMultiSuccessCallback SuccessCallback;
|
S3fsMultiSuccessCallback SuccessCallback;
|
||||||
S3fsMultiRetryCallback RetryCallback;
|
S3fsMultiRetryCallback RetryCallback;
|
||||||
|
|
||||||
|
pthread_mutex_t completed_tids_lock;
|
||||||
|
std::vector<pthread_t> completed_tids;
|
||||||
|
|
||||||
private:
|
private:
|
||||||
bool ClearEx(bool is_all);
|
bool ClearEx(bool is_all);
|
||||||
int MultiPerform(void);
|
int MultiPerform(void);
|
||||||
|
@ -2373,7 +2373,6 @@ static int readdir_multi_head(const char* path, S3ObjList& head, void* buf, fuse
|
|||||||
curlmulti.SetSuccessCallback(multi_head_callback);
|
curlmulti.SetSuccessCallback(multi_head_callback);
|
||||||
curlmulti.SetRetryCallback(multi_head_retry_callback);
|
curlmulti.SetRetryCallback(multi_head_retry_callback);
|
||||||
|
|
||||||
// TODO: deindent
|
|
||||||
s3obj_list_t::iterator iter;
|
s3obj_list_t::iterator iter;
|
||||||
|
|
||||||
fillerlist.clear();
|
fillerlist.clear();
|
||||||
|
Loading…
x
Reference in New Issue
Block a user