mirror of
https://github.com/s3fs-fuse/s3fs-fuse.git
synced 2025-01-08 15:44:11 +00:00
Merge pull request #914 from gaul/readdir/head-of-line
Issue readdir HEAD requests without batching
This commit is contained in:
commit
fada95f58e
48
src/curl.cpp
48
src/curl.cpp
@ -374,6 +374,7 @@ string S3fsCurl::curl_ca_bundle;
|
||||
mimes_t S3fsCurl::mimeTypes;
|
||||
string S3fsCurl::userAgent;
|
||||
int S3fsCurl::max_parallel_cnt = 5; // default
|
||||
int S3fsCurl::max_multireq = 20; // default
|
||||
off_t S3fsCurl::multipart_size = MULTIPART_SIZE; // default
|
||||
bool S3fsCurl::is_sigv4 = true; // default
|
||||
bool S3fsCurl::is_ua = true; // default
|
||||
@ -1259,6 +1260,13 @@ int S3fsCurl::SetMaxParallelCount(int value)
|
||||
return old;
|
||||
}
|
||||
|
||||
int S3fsCurl::SetMaxMultiRequest(int max)
|
||||
{
|
||||
int old = S3fsCurl::max_multireq;
|
||||
S3fsCurl::max_multireq = max;
|
||||
return old;
|
||||
}
|
||||
|
||||
bool S3fsCurl::UploadMultipartPostCallback(S3fsCurl* s3fscurl)
|
||||
{
|
||||
if(!s3fscurl){
|
||||
@ -1343,7 +1351,7 @@ int S3fsCurl::ParallelMultipartUploadRequest(const char* tpath, headers_t& meta,
|
||||
s3fscurl.DestroyCurlHandle();
|
||||
|
||||
// Initialize S3fsMultiCurl
|
||||
S3fsMultiCurl curlmulti;
|
||||
S3fsMultiCurl curlmulti(GetMaxParallelCount());
|
||||
curlmulti.SetSuccessCallback(S3fsCurl::UploadMultipartPostCallback);
|
||||
curlmulti.SetRetryCallback(S3fsCurl::UploadMultipartPostRetryCallback);
|
||||
|
||||
@ -1433,7 +1441,7 @@ int S3fsCurl::ParallelGetObjectRequest(const char* tpath, int fd, off_t start, s
|
||||
|
||||
// cycle through open fd, pulling off 10MB chunks at a time
|
||||
for(remaining_bytes = size; 0 < remaining_bytes; ){
|
||||
S3fsMultiCurl curlmulti;
|
||||
S3fsMultiCurl curlmulti(GetMaxParallelCount());
|
||||
int para_cnt;
|
||||
off_t chunk;
|
||||
|
||||
@ -3847,27 +3855,13 @@ int S3fsCurl::MultipartRenameRequest(const char* from, const char* to, headers_t
|
||||
return 0;
|
||||
}
|
||||
|
||||
//-------------------------------------------------------------------
|
||||
// Class S3fsMultiCurl
|
||||
//-------------------------------------------------------------------
|
||||
static const int MAX_MULTI_HEADREQ = 20; // default: max request count in readdir curl_multi.
|
||||
|
||||
//-------------------------------------------------------------------
|
||||
// Class method for S3fsMultiCurl
|
||||
//-------------------------------------------------------------------
|
||||
int S3fsMultiCurl::max_multireq = MAX_MULTI_HEADREQ;
|
||||
|
||||
int S3fsMultiCurl::SetMaxMultiRequest(int max)
|
||||
{
|
||||
int old = S3fsMultiCurl::max_multireq;
|
||||
S3fsMultiCurl::max_multireq= max;
|
||||
return old;
|
||||
}
|
||||
|
||||
//-------------------------------------------------------------------
|
||||
// method for S3fsMultiCurl
|
||||
//-------------------------------------------------------------------
|
||||
S3fsMultiCurl::S3fsMultiCurl() : SuccessCallback(NULL), RetryCallback(NULL)
|
||||
S3fsMultiCurl::S3fsMultiCurl(int maxParallelism)
|
||||
: maxParallelism(maxParallelism)
|
||||
, SuccessCallback(NULL)
|
||||
, RetryCallback(NULL)
|
||||
{
|
||||
}
|
||||
|
||||
@ -3930,7 +3924,7 @@ int S3fsMultiCurl::MultiPerform(void)
|
||||
std::vector<pthread_t> threads;
|
||||
bool success = true;
|
||||
bool isMultiHead = false;
|
||||
Semaphore sem(S3fsCurl::max_parallel_cnt);
|
||||
Semaphore sem(GetMaxParallelism());
|
||||
int rc;
|
||||
|
||||
for(s3fscurlmap_t::iterator iter = cMap_req.begin(); iter != cMap_req.end(); ++iter) {
|
||||
@ -3975,17 +3969,10 @@ int S3fsMultiCurl::MultiPerform(void)
|
||||
threads.push_back(thread);
|
||||
}
|
||||
|
||||
for(int i = 0; i < S3fsCurl::max_parallel_cnt; ++i){
|
||||
for(int i = 0; i < sem.get_value(); ++i){
|
||||
sem.wait();
|
||||
}
|
||||
|
||||
#ifdef __APPLE__
|
||||
// macOS cannot destroy a semaphore with posts less than the initializer
|
||||
for(int i = 0; i < S3fsCurl::max_parallel_cnt; ++i){
|
||||
sem.post();
|
||||
}
|
||||
#endif
|
||||
|
||||
for (std::vector<pthread_t>::iterator titer = threads.begin(); titer != threads.end(); ++titer) {
|
||||
void* retval;
|
||||
|
||||
@ -4082,9 +4069,8 @@ int S3fsMultiCurl::Request(void)
|
||||
while(!cMap_all.empty()){
|
||||
// set curl handle to multi handle
|
||||
int result;
|
||||
int cnt;
|
||||
s3fscurlmap_t::iterator iter;
|
||||
for(cnt = 0, iter = cMap_all.begin(); cnt < S3fsMultiCurl::max_multireq && iter != cMap_all.end(); cMap_all.erase(iter++), cnt++){
|
||||
for(iter = cMap_all.begin(); iter != cMap_all.end(); cMap_all.erase(iter++)){
|
||||
CURL* hCurl = (*iter).first;
|
||||
S3fsCurl* s3fscurl = (*iter).second;
|
||||
|
||||
|
12
src/curl.h
12
src/curl.h
@ -249,6 +249,7 @@ class S3fsCurl
|
||||
static mimes_t mimeTypes;
|
||||
static std::string userAgent;
|
||||
static int max_parallel_cnt;
|
||||
static int max_multireq;
|
||||
static off_t multipart_size;
|
||||
static bool is_sigv4;
|
||||
static bool is_ua; // User-Agent
|
||||
@ -389,8 +390,12 @@ class S3fsCurl
|
||||
}
|
||||
static long SetSslVerifyHostname(long value);
|
||||
static long GetSslVerifyHostname(void) { return S3fsCurl::ssl_verify_hostname; }
|
||||
// maximum parallel GET and PUT requests
|
||||
static int SetMaxParallelCount(int value);
|
||||
static int GetMaxParallelCount(void) { return S3fsCurl::max_parallel_cnt; }
|
||||
// maximum parallel HEAD requests
|
||||
static int SetMaxMultiRequest(int max);
|
||||
static int GetMaxMultiRequest(void) { return S3fsCurl::max_multireq; }
|
||||
static bool SetIsECS(bool flag);
|
||||
static bool SetIsIBMIAMAuth(bool flag);
|
||||
static size_t SetIAMFieldCount(size_t field_count);
|
||||
@ -470,7 +475,7 @@ typedef S3fsCurl* (*S3fsMultiRetryCallback)(S3fsCurl* s3fscurl); // callback for
|
||||
class S3fsMultiCurl
|
||||
{
|
||||
private:
|
||||
static int max_multireq;
|
||||
const int maxParallelism;
|
||||
|
||||
s3fscurlmap_t cMap_all; // all of curl requests
|
||||
s3fscurlmap_t cMap_req; // curl requests are sent
|
||||
@ -486,11 +491,10 @@ class S3fsMultiCurl
|
||||
static void* RequestPerformWrapper(void* arg);
|
||||
|
||||
public:
|
||||
S3fsMultiCurl();
|
||||
explicit S3fsMultiCurl(int maxParallelism);
|
||||
~S3fsMultiCurl();
|
||||
|
||||
static int SetMaxMultiRequest(int max);
|
||||
static int GetMaxMultiRequest(void) { return S3fsMultiCurl::max_multireq; }
|
||||
int GetMaxParallelism() { return maxParallelism; }
|
||||
|
||||
S3fsMultiSuccessCallback SetSuccessCallback(S3fsMultiSuccessCallback function);
|
||||
S3fsMultiRetryCallback SetRetryCallback(S3fsMultiRetryCallback function);
|
||||
|
@ -30,11 +30,19 @@
|
||||
class Semaphore
|
||||
{
|
||||
public:
|
||||
explicit Semaphore(int value) : sem(dispatch_semaphore_create(value)) {}
|
||||
~Semaphore() { dispatch_release(sem); }
|
||||
explicit Semaphore(int value) : value(value), sem(dispatch_semaphore_create(value)) {}
|
||||
~Semaphore() {
|
||||
// macOS cannot destroy a semaphore with posts less than the initializer
|
||||
for(int i = 0; i < get_value(); ++i){
|
||||
post();
|
||||
}
|
||||
dispatch_release(sem);
|
||||
}
|
||||
void wait() { dispatch_semaphore_wait(sem, DISPATCH_TIME_FOREVER); }
|
||||
void post() { dispatch_semaphore_signal(sem); }
|
||||
int get_value() const { return value; }
|
||||
private:
|
||||
const int value;
|
||||
dispatch_semaphore_t sem;
|
||||
};
|
||||
|
||||
@ -46,7 +54,7 @@ class Semaphore
|
||||
class Semaphore
|
||||
{
|
||||
public:
|
||||
explicit Semaphore(int value) { sem_init(&mutex, 0, value); }
|
||||
explicit Semaphore(int value) : value(value) { sem_init(&mutex, 0, value); }
|
||||
~Semaphore() { sem_destroy(&mutex); }
|
||||
void wait()
|
||||
{
|
||||
@ -56,7 +64,9 @@ class Semaphore
|
||||
} while (r == -1 && errno == EINTR);
|
||||
}
|
||||
void post() { sem_post(&mutex); }
|
||||
int get_value() const { return value; }
|
||||
private:
|
||||
const int value;
|
||||
sem_t mutex;
|
||||
};
|
||||
|
||||
|
118
src/s3fs.cpp
118
src/s3fs.cpp
@ -2358,7 +2358,7 @@ static S3fsCurl* multi_head_retry_callback(S3fsCurl* s3fscurl)
|
||||
|
||||
static int readdir_multi_head(const char* path, S3ObjList& head, void* buf, fuse_fill_dir_t filler)
|
||||
{
|
||||
S3fsMultiCurl curlmulti;
|
||||
S3fsMultiCurl curlmulti(S3fsCurl::GetMaxMultiRequest());
|
||||
s3obj_list_t headlist;
|
||||
s3obj_list_t fillerlist;
|
||||
int result = 0;
|
||||
@ -2372,75 +2372,69 @@ static int readdir_multi_head(const char* path, S3ObjList& head, void* buf, fuse
|
||||
curlmulti.SetSuccessCallback(multi_head_callback);
|
||||
curlmulti.SetRetryCallback(multi_head_retry_callback);
|
||||
|
||||
// Loop
|
||||
while(!headlist.empty()){
|
||||
s3obj_list_t::iterator iter;
|
||||
long cnt;
|
||||
// TODO: deindent
|
||||
s3obj_list_t::iterator iter;
|
||||
|
||||
fillerlist.clear();
|
||||
// Make single head request(with max).
|
||||
for(iter = headlist.begin(), cnt = 0; headlist.end() != iter && cnt < S3fsMultiCurl::GetMaxMultiRequest(); iter = headlist.erase(iter)){
|
||||
string disppath = path + (*iter);
|
||||
string etag = head.GetETag((*iter).c_str());
|
||||
fillerlist.clear();
|
||||
// Make single head request(with max).
|
||||
for(iter = headlist.begin(); headlist.end() != iter; iter = headlist.erase(iter)){
|
||||
string disppath = path + (*iter);
|
||||
string etag = head.GetETag((*iter).c_str());
|
||||
|
||||
string fillpath = disppath;
|
||||
if('/' == disppath[disppath.length() - 1]){
|
||||
fillpath = fillpath.substr(0, fillpath.length() -1);
|
||||
}
|
||||
fillerlist.push_back(fillpath);
|
||||
string fillpath = disppath;
|
||||
if('/' == disppath[disppath.length() - 1]){
|
||||
fillpath = fillpath.substr(0, fillpath.length() -1);
|
||||
}
|
||||
fillerlist.push_back(fillpath);
|
||||
|
||||
if(StatCache::getStatCacheData()->HasStat(disppath, etag.c_str())){
|
||||
continue;
|
||||
}
|
||||
|
||||
// First check for directory, start checking "not SSE-C".
|
||||
// If checking failed, retry to check with "SSE-C" by retry callback func when SSE-C mode.
|
||||
S3fsCurl* s3fscurl = new S3fsCurl();
|
||||
if(!s3fscurl->PreHeadRequest(disppath, (*iter), disppath)){ // target path = cache key path.(ex "dir/")
|
||||
S3FS_PRN_WARN("Could not make curl object for head request(%s).", disppath.c_str());
|
||||
delete s3fscurl;
|
||||
continue;
|
||||
}
|
||||
|
||||
if(!curlmulti.SetS3fsCurlObject(s3fscurl)){
|
||||
S3FS_PRN_WARN("Could not make curl object into multi curl(%s).", disppath.c_str());
|
||||
delete s3fscurl;
|
||||
continue;
|
||||
}
|
||||
cnt++; // max request count within S3fsMultiCurl::GetMaxMultiRequest()
|
||||
if(StatCache::getStatCacheData()->HasStat(disppath, etag.c_str())){
|
||||
continue;
|
||||
}
|
||||
|
||||
// Multi request
|
||||
if(0 != (result = curlmulti.Request())){
|
||||
// If result is -EIO, it is something error occurred.
|
||||
// This case includes that the object is encrypting(SSE) and s3fs does not have keys.
|
||||
// So s3fs set result to 0 in order to continue the process.
|
||||
if(-EIO == result){
|
||||
S3FS_PRN_WARN("error occurred in multi request(errno=%d), but continue...", result);
|
||||
result = 0;
|
||||
}else{
|
||||
S3FS_PRN_ERR("error occurred in multi request(errno=%d).", result);
|
||||
break;
|
||||
}
|
||||
// First check for directory, start checking "not SSE-C".
|
||||
// If checking failed, retry to check with "SSE-C" by retry callback func when SSE-C mode.
|
||||
S3fsCurl* s3fscurl = new S3fsCurl();
|
||||
if(!s3fscurl->PreHeadRequest(disppath, (*iter), disppath)){ // target path = cache key path.(ex "dir/")
|
||||
S3FS_PRN_WARN("Could not make curl object for head request(%s).", disppath.c_str());
|
||||
delete s3fscurl;
|
||||
continue;
|
||||
}
|
||||
|
||||
// populate fuse buffer
|
||||
// here is best position, because a case is cache size < files in directory
|
||||
//
|
||||
for(iter = fillerlist.begin(); fillerlist.end() != iter; ++iter){
|
||||
struct stat st;
|
||||
string bpath = mybasename((*iter));
|
||||
if(StatCache::getStatCacheData()->GetStat((*iter), &st)){
|
||||
filler(buf, bpath.c_str(), &st, 0);
|
||||
}else{
|
||||
S3FS_PRN_INFO2("Could not find %s file in stat cache.", (*iter).c_str());
|
||||
filler(buf, bpath.c_str(), 0, 0);
|
||||
}
|
||||
if(!curlmulti.SetS3fsCurlObject(s3fscurl)){
|
||||
S3FS_PRN_WARN("Could not make curl object into multi curl(%s).", disppath.c_str());
|
||||
delete s3fscurl;
|
||||
continue;
|
||||
}
|
||||
|
||||
// reinit for loop.
|
||||
curlmulti.Clear();
|
||||
}
|
||||
|
||||
// Multi request
|
||||
if(0 != (result = curlmulti.Request())){
|
||||
// If result is -EIO, it is something error occurred.
|
||||
// This case includes that the object is encrypting(SSE) and s3fs does not have keys.
|
||||
// So s3fs set result to 0 in order to continue the process.
|
||||
if(-EIO == result){
|
||||
S3FS_PRN_WARN("error occurred in multi request(errno=%d), but continue...", result);
|
||||
result = 0;
|
||||
}else{
|
||||
S3FS_PRN_ERR("error occurred in multi request(errno=%d).", result);
|
||||
return result;
|
||||
}
|
||||
}
|
||||
|
||||
// populate fuse buffer
|
||||
// here is best position, because a case is cache size < files in directory
|
||||
//
|
||||
for(iter = fillerlist.begin(); fillerlist.end() != iter; ++iter){
|
||||
struct stat st;
|
||||
string bpath = mybasename((*iter));
|
||||
if(StatCache::getStatCacheData()->GetStat((*iter), &st)){
|
||||
filler(buf, bpath.c_str(), &st, 0);
|
||||
}else{
|
||||
S3FS_PRN_INFO2("Could not find %s file in stat cache.", (*iter).c_str());
|
||||
filler(buf, bpath.c_str(), 0, 0);
|
||||
}
|
||||
}
|
||||
|
||||
return result;
|
||||
}
|
||||
|
||||
@ -4445,7 +4439,7 @@ static int my_fuse_opt_proc(void* data, const char* arg, int key, struct fuse_ar
|
||||
}
|
||||
if(0 == STR2NCMP(arg, "multireq_max=")){
|
||||
long maxreq = static_cast<long>(s3fs_strtoofft(strchr(arg, '=') + sizeof(char)));
|
||||
S3fsMultiCurl::SetMaxMultiRequest(maxreq);
|
||||
S3fsCurl::SetMaxMultiRequest(maxreq);
|
||||
return 0;
|
||||
}
|
||||
if(0 == strcmp(arg, "nonempty")){
|
||||
|
Loading…
Reference in New Issue
Block a user