mirror of
https://github.com/s3fs-fuse/s3fs-fuse.git
synced 2024-12-23 01:08:54 +00:00
Issue readdir HEAD requests without batching
Previously s3fs would issue a batch of HEAD requests and wait for all
to succeed before issuing the next batch. Now it issues the first
batch and only waits for a single call to succeed before issuing the
next call. This can improve performance when one call lags due to
network errors. I measured 25% improvement with the same level of
parallelism. This commit also reparents parallelism knobs for
consistency. Follows on to 88cd8feb05
.
Fixes #223.
This commit is contained in:
parent
beadf95975
commit
46d79c5bc2
48
src/curl.cpp
48
src/curl.cpp
@ -373,6 +373,7 @@ string S3fsCurl::curl_ca_bundle;
|
||||
mimes_t S3fsCurl::mimeTypes;
|
||||
string S3fsCurl::userAgent;
|
||||
int S3fsCurl::max_parallel_cnt = 5; // default
|
||||
int S3fsCurl::max_multireq = 20; // default
|
||||
off_t S3fsCurl::multipart_size = MULTIPART_SIZE; // default
|
||||
bool S3fsCurl::is_sigv4 = true; // default
|
||||
bool S3fsCurl::is_ua = true; // default
|
||||
@ -1258,6 +1259,13 @@ int S3fsCurl::SetMaxParallelCount(int value)
|
||||
return old;
|
||||
}
|
||||
|
||||
int S3fsCurl::SetMaxMultiRequest(int max)
|
||||
{
|
||||
int old = S3fsCurl::max_multireq;
|
||||
S3fsCurl::max_multireq = max;
|
||||
return old;
|
||||
}
|
||||
|
||||
bool S3fsCurl::UploadMultipartPostCallback(S3fsCurl* s3fscurl)
|
||||
{
|
||||
if(!s3fscurl){
|
||||
@ -1342,7 +1350,7 @@ int S3fsCurl::ParallelMultipartUploadRequest(const char* tpath, headers_t& meta,
|
||||
s3fscurl.DestroyCurlHandle();
|
||||
|
||||
// Initialize S3fsMultiCurl
|
||||
S3fsMultiCurl curlmulti;
|
||||
S3fsMultiCurl curlmulti(GetMaxParallelCount());
|
||||
curlmulti.SetSuccessCallback(S3fsCurl::UploadMultipartPostCallback);
|
||||
curlmulti.SetRetryCallback(S3fsCurl::UploadMultipartPostRetryCallback);
|
||||
|
||||
@ -1432,7 +1440,7 @@ int S3fsCurl::ParallelGetObjectRequest(const char* tpath, int fd, off_t start, s
|
||||
|
||||
// cycle through open fd, pulling off 10MB chunks at a time
|
||||
for(remaining_bytes = size; 0 < remaining_bytes; ){
|
||||
S3fsMultiCurl curlmulti;
|
||||
S3fsMultiCurl curlmulti(GetMaxParallelCount());
|
||||
int para_cnt;
|
||||
off_t chunk;
|
||||
|
||||
@ -3846,27 +3854,13 @@ int S3fsCurl::MultipartRenameRequest(const char* from, const char* to, headers_t
|
||||
return 0;
|
||||
}
|
||||
|
||||
//-------------------------------------------------------------------
|
||||
// Class S3fsMultiCurl
|
||||
//-------------------------------------------------------------------
|
||||
static const int MAX_MULTI_HEADREQ = 20; // default: max request count in readdir curl_multi.
|
||||
|
||||
//-------------------------------------------------------------------
|
||||
// Class method for S3fsMultiCurl
|
||||
//-------------------------------------------------------------------
|
||||
int S3fsMultiCurl::max_multireq = MAX_MULTI_HEADREQ;
|
||||
|
||||
int S3fsMultiCurl::SetMaxMultiRequest(int max)
|
||||
{
|
||||
int old = S3fsMultiCurl::max_multireq;
|
||||
S3fsMultiCurl::max_multireq= max;
|
||||
return old;
|
||||
}
|
||||
|
||||
//-------------------------------------------------------------------
|
||||
// method for S3fsMultiCurl
|
||||
//-------------------------------------------------------------------
|
||||
S3fsMultiCurl::S3fsMultiCurl() : SuccessCallback(NULL), RetryCallback(NULL)
|
||||
S3fsMultiCurl::S3fsMultiCurl(int maxParallelism)
|
||||
: maxParallelism(maxParallelism)
|
||||
, SuccessCallback(NULL)
|
||||
, RetryCallback(NULL)
|
||||
{
|
||||
}
|
||||
|
||||
@ -3929,7 +3923,7 @@ int S3fsMultiCurl::MultiPerform(void)
|
||||
std::vector<pthread_t> threads;
|
||||
bool success = true;
|
||||
bool isMultiHead = false;
|
||||
Semaphore sem(S3fsCurl::max_parallel_cnt);
|
||||
Semaphore sem(GetMaxParallelism());
|
||||
int rc;
|
||||
|
||||
for(s3fscurlmap_t::iterator iter = cMap_req.begin(); iter != cMap_req.end(); ++iter) {
|
||||
@ -3974,17 +3968,10 @@ int S3fsMultiCurl::MultiPerform(void)
|
||||
threads.push_back(thread);
|
||||
}
|
||||
|
||||
for(int i = 0; i < S3fsCurl::max_parallel_cnt; ++i){
|
||||
for(int i = 0; i < sem.get_value(); ++i){
|
||||
sem.wait();
|
||||
}
|
||||
|
||||
#ifdef __APPLE__
|
||||
// macOS cannot destroy a semaphore with posts less than the initializer
|
||||
for(int i = 0; i < S3fsCurl::max_parallel_cnt; ++i){
|
||||
sem.post();
|
||||
}
|
||||
#endif
|
||||
|
||||
for (std::vector<pthread_t>::iterator titer = threads.begin(); titer != threads.end(); ++titer) {
|
||||
void* retval;
|
||||
|
||||
@ -4081,9 +4068,8 @@ int S3fsMultiCurl::Request(void)
|
||||
while(!cMap_all.empty()){
|
||||
// set curl handle to multi handle
|
||||
int result;
|
||||
int cnt;
|
||||
s3fscurlmap_t::iterator iter;
|
||||
for(cnt = 0, iter = cMap_all.begin(); cnt < S3fsMultiCurl::max_multireq && iter != cMap_all.end(); cMap_all.erase(iter++), cnt++){
|
||||
for(iter = cMap_all.begin(); iter != cMap_all.end(); cMap_all.erase(iter++)){
|
||||
CURL* hCurl = (*iter).first;
|
||||
S3fsCurl* s3fscurl = (*iter).second;
|
||||
|
||||
|
12
src/curl.h
12
src/curl.h
@ -249,6 +249,7 @@ class S3fsCurl
|
||||
static mimes_t mimeTypes;
|
||||
static std::string userAgent;
|
||||
static int max_parallel_cnt;
|
||||
static int max_multireq;
|
||||
static off_t multipart_size;
|
||||
static bool is_sigv4;
|
||||
static bool is_ua; // User-Agent
|
||||
@ -389,8 +390,12 @@ class S3fsCurl
|
||||
}
|
||||
static long SetSslVerifyHostname(long value);
|
||||
static long GetSslVerifyHostname(void) { return S3fsCurl::ssl_verify_hostname; }
|
||||
// maximum parallel GET and PUT requests
|
||||
static int SetMaxParallelCount(int value);
|
||||
static int GetMaxParallelCount(void) { return S3fsCurl::max_parallel_cnt; }
|
||||
// maximum parallel HEAD requests
|
||||
static int SetMaxMultiRequest(int max);
|
||||
static int GetMaxMultiRequest(void) { return S3fsCurl::max_multireq; }
|
||||
static bool SetIsECS(bool flag);
|
||||
static bool SetIsIBMIAMAuth(bool flag);
|
||||
static size_t SetIAMFieldCount(size_t field_count);
|
||||
@ -470,7 +475,7 @@ typedef S3fsCurl* (*S3fsMultiRetryCallback)(S3fsCurl* s3fscurl); // callback for
|
||||
class S3fsMultiCurl
|
||||
{
|
||||
private:
|
||||
static int max_multireq;
|
||||
const int maxParallelism;
|
||||
|
||||
s3fscurlmap_t cMap_all; // all of curl requests
|
||||
s3fscurlmap_t cMap_req; // curl requests are sent
|
||||
@ -486,11 +491,10 @@ class S3fsMultiCurl
|
||||
static void* RequestPerformWrapper(void* arg);
|
||||
|
||||
public:
|
||||
S3fsMultiCurl();
|
||||
explicit S3fsMultiCurl(int maxParallelism);
|
||||
~S3fsMultiCurl();
|
||||
|
||||
static int SetMaxMultiRequest(int max);
|
||||
static int GetMaxMultiRequest(void) { return S3fsMultiCurl::max_multireq; }
|
||||
int GetMaxParallelism() { return maxParallelism; }
|
||||
|
||||
S3fsMultiSuccessCallback SetSuccessCallback(S3fsMultiSuccessCallback function);
|
||||
S3fsMultiRetryCallback SetRetryCallback(S3fsMultiRetryCallback function);
|
||||
|
@ -30,11 +30,19 @@
|
||||
class Semaphore
|
||||
{
|
||||
public:
|
||||
explicit Semaphore(int value) : sem(dispatch_semaphore_create(value)) {}
|
||||
~Semaphore() { dispatch_release(sem); }
|
||||
explicit Semaphore(int value) : value(value), sem(dispatch_semaphore_create(value)) {}
|
||||
~Semaphore() {
|
||||
// macOS cannot destroy a semaphore with posts less than the initializer
|
||||
for(int i = 0; i < get_value(); ++i){
|
||||
post();
|
||||
}
|
||||
dispatch_release(sem);
|
||||
}
|
||||
void wait() { dispatch_semaphore_wait(sem, DISPATCH_TIME_FOREVER); }
|
||||
void post() { dispatch_semaphore_signal(sem); }
|
||||
int get_value() const { return value; }
|
||||
private:
|
||||
const int value;
|
||||
dispatch_semaphore_t sem;
|
||||
};
|
||||
|
||||
@ -46,7 +54,7 @@ class Semaphore
|
||||
class Semaphore
|
||||
{
|
||||
public:
|
||||
explicit Semaphore(int value) { sem_init(&mutex, 0, value); }
|
||||
explicit Semaphore(int value) : value(value) { sem_init(&mutex, 0, value); }
|
||||
~Semaphore() { sem_destroy(&mutex); }
|
||||
void wait()
|
||||
{
|
||||
@ -56,7 +64,9 @@ class Semaphore
|
||||
} while (r == -1 && errno == EINTR);
|
||||
}
|
||||
void post() { sem_post(&mutex); }
|
||||
int get_value() const { return value; }
|
||||
private:
|
||||
const int value;
|
||||
sem_t mutex;
|
||||
};
|
||||
|
||||
|
16
src/s3fs.cpp
16
src/s3fs.cpp
@ -2358,7 +2358,7 @@ static S3fsCurl* multi_head_retry_callback(S3fsCurl* s3fscurl)
|
||||
|
||||
static int readdir_multi_head(const char* path, S3ObjList& head, void* buf, fuse_fill_dir_t filler)
|
||||
{
|
||||
S3fsMultiCurl curlmulti;
|
||||
S3fsMultiCurl curlmulti(S3fsCurl::GetMaxMultiRequest());
|
||||
s3obj_list_t headlist;
|
||||
s3obj_list_t fillerlist;
|
||||
int result = 0;
|
||||
@ -2372,14 +2372,12 @@ static int readdir_multi_head(const char* path, S3ObjList& head, void* buf, fuse
|
||||
curlmulti.SetSuccessCallback(multi_head_callback);
|
||||
curlmulti.SetRetryCallback(multi_head_retry_callback);
|
||||
|
||||
// Loop
|
||||
while(!headlist.empty()){
|
||||
// TODO: deindent
|
||||
s3obj_list_t::iterator iter;
|
||||
long cnt;
|
||||
|
||||
fillerlist.clear();
|
||||
// Make single head request(with max).
|
||||
for(iter = headlist.begin(), cnt = 0; headlist.end() != iter && cnt < S3fsMultiCurl::GetMaxMultiRequest(); iter = headlist.erase(iter)){
|
||||
for(iter = headlist.begin(); headlist.end() != iter; iter = headlist.erase(iter)){
|
||||
string disppath = path + (*iter);
|
||||
string etag = head.GetETag((*iter).c_str());
|
||||
|
||||
@ -2407,7 +2405,6 @@ static int readdir_multi_head(const char* path, S3ObjList& head, void* buf, fuse
|
||||
delete s3fscurl;
|
||||
continue;
|
||||
}
|
||||
cnt++; // max request count within S3fsMultiCurl::GetMaxMultiRequest()
|
||||
}
|
||||
|
||||
// Multi request
|
||||
@ -2420,7 +2417,7 @@ static int readdir_multi_head(const char* path, S3ObjList& head, void* buf, fuse
|
||||
result = 0;
|
||||
}else{
|
||||
S3FS_PRN_ERR("error occurred in multi request(errno=%d).", result);
|
||||
break;
|
||||
return result;
|
||||
}
|
||||
}
|
||||
|
||||
@ -2438,9 +2435,6 @@ static int readdir_multi_head(const char* path, S3ObjList& head, void* buf, fuse
|
||||
}
|
||||
}
|
||||
|
||||
// reinit for loop.
|
||||
curlmulti.Clear();
|
||||
}
|
||||
return result;
|
||||
}
|
||||
|
||||
@ -4445,7 +4439,7 @@ static int my_fuse_opt_proc(void* data, const char* arg, int key, struct fuse_ar
|
||||
}
|
||||
if(0 == STR2NCMP(arg, "multireq_max=")){
|
||||
long maxreq = static_cast<long>(s3fs_strtoofft(strchr(arg, '=') + sizeof(char)));
|
||||
S3fsMultiCurl::SetMaxMultiRequest(maxreq);
|
||||
S3fsCurl::SetMaxMultiRequest(maxreq);
|
||||
return 0;
|
||||
}
|
||||
if(0 == strcmp(arg, "nonempty")){
|
||||
|
Loading…
Reference in New Issue
Block a user