From 7aa11f389a71227a26566ee75c5c2fc2be620053 Mon Sep 17 00:00:00 2001 From: "ggtakec@gmail.com" Date: Mon, 27 May 2013 01:15:48 +0000 Subject: [PATCH] Fixed Issue 235 1) Problems using encrypted connection to s3(Issue 235) In s3fs_readdir() function, s3fs gets CURLE_COULDNT_CONNECT error when s3fs reads objects header information. Probably, this problem is too many request in curl_multi request. Then s3fs codes are changed: * max request in curl_multi request is 500 and s3fs loops to call curl_multi. * retries to call request which returns error as CURLE_COULDNT_CONNECT. git-svn-id: http://s3fs.googlecode.com/svn/trunk@430 df820570-a93a-0410-bd06-b72b767a4274 --- src/curl.h | 7 +- src/s3fs.cpp | 300 +++++++++++++++++++++++++++------------------------ 2 files changed, 168 insertions(+), 139 deletions(-) diff --git a/src/curl.h b/src/curl.h index cdf6b7e..cfa7a05 100644 --- a/src/curl.h +++ b/src/curl.h @@ -55,6 +55,7 @@ class auto_curl_slist { // header data struct head_data { + std::string base_path; std::string path; std::string *url; struct curl_slist *requestHeaders; @@ -81,7 +82,7 @@ class auto_head { public: auto_head() {} ~auto_head() { - for_each(headMap.begin(), headMap.end(), cleanup_head_data()); + removeAll(); } headMap_t& get() { return headMap; } @@ -101,6 +102,10 @@ class auto_head { headMap.erase(iter); } + void removeAll(void) { + for_each(headMap.begin(), headMap.end(), cleanup_head_data()); + } + private: headMap_t headMap; }; diff --git a/src/s3fs.cpp b/src/s3fs.cpp index 075953a..c5e79e6 100644 --- a/src/s3fs.cpp +++ b/src/s3fs.cpp @@ -53,6 +53,7 @@ using namespace std; //------------------------------------------------------------------- // Define //------------------------------------------------------------------- +#define MAX_MULTI_HEADREQ 500 // max request count in readdir curl_multi. #define DIRTYPE_UNKNOWN -1 #define DIRTYPE_NEW 0 #define DIRTYPE_OLD 1 @@ -135,6 +136,7 @@ static int get_object_attribute(const char *path, struct stat *pstbuf, headers_t static int check_object_access(const char *path, int mask, struct stat* pstbuf); static int check_object_owner(const char *path, struct stat* pstbuf); static int check_parent_object_access(const char *path, int mask); +static int readdir_multi_head(const char *path, S3ObjList& head); static int list_bucket(const char *path, S3ObjList& head, const char* delimiter); static int directory_empty(const char *path); static bool is_truncated(const char *xml); @@ -2900,27 +2902,173 @@ static int s3fs_opendir(const char *path, struct fuse_file_info *fi) return result; } -static int s3fs_readdir( - const char *path, void *buf, fuse_fill_dir_t filler, off_t offset, struct fuse_file_info *fi) { - CURLM *mh; - CURLMsg *msg; - CURLMcode curlm_code; - int n_reqs; - int remaining_messages; +static int readdir_multi_head(const char *path, S3ObjList& head) +{ + CURLM* mh; + CURLMcode curlm_code; + CURLMsg* msg; + int still_running; + int remaining_messages; + int cnt; + size_t last_remaining_host; + s3obj_list_t headlist; + auto_head curl_map; // delete object and curl handle automatically. + s3obj_list_t::iterator liter; + + // Make base path list. + head.GetNameList(headlist, true, false); // get name with "/". + + FGPRINT(" readdir_multi_head[path=%s][list=%ld]\n", path, headlist.size()); + + // Make request list. + // + // Send multi request loop( with retry ) + // (When many request is sends, sometimes gets "Couldn't connect to server") + // + for(last_remaining_host = headlist.size() + 1; last_remaining_host > headlist.size() && headlist.size(); ){ + last_remaining_host = headlist.size(); + + // populate the multi interface with an initial set of requests + mh = curl_multi_init(); + + // Make single head request. + for(liter = headlist.begin(), cnt = 0; headlist.end() != liter && cnt < MAX_MULTI_HEADREQ; ){ + string fullpath = path + (*liter); + string fullorg = path + head.GetOrgName((*liter).c_str()); + string etag = head.GetETag((*liter).c_str()); + + if(StatCache::getStatCacheData()->HasStat(fullpath, etag.c_str())){ + liter = headlist.erase(liter); + continue; + } + + // file not cached, prepare a call to get_headers + head_data request_data; + request_data.base_path = (*liter); + request_data.path = fullorg; + CURL* curl_handle = create_head_handle(&request_data); + my_set_curl_share(curl_handle); // set dns cache + request_data.path = fullpath; // Notice: replace org to normalized for cache key. + curl_map.get()[curl_handle] = request_data; + + // add this handle to the multi handle + if(CURLM_OK != (curlm_code = curl_multi_add_handle(mh, curl_handle))){ + SYSLOGERR("readdir_multi_head: curl_multi_add_handle code: %d msg: %s", curlm_code, curl_multi_strerror(curlm_code)); + FGPRINT(" readdir_multi_head: curl_multi_add_handle code: %d msg: %s\n", curlm_code, curl_multi_strerror(curlm_code)); + curl_multi_cleanup(mh); + return -EIO; + } + liter++; + cnt++; // max request count in multi-request is MAX_MULTI_HEADREQ. + } + + // Send multi request. + do{ + // Start making requests and check running. + still_running = 0; + do { + curlm_code = curl_multi_perform(mh, &still_running); + } while(curlm_code == CURLM_CALL_MULTI_PERFORM); + + if(curlm_code != CURLM_OK) { + SYSLOGERR("readdir_multi_head: curl_multi_perform code: %d msg: %s", curlm_code, curl_multi_strerror(curlm_code)); + FGPRINT(" readdir_multi_head: curl_multi_perform code: %d msg: %s\n", curlm_code, curl_multi_strerror(curlm_code)); + } + + // Set timer when still running + if(still_running) { + long milliseconds; + fd_set r_fd; + fd_set w_fd; + fd_set e_fd; + FD_ZERO(&r_fd); + FD_ZERO(&w_fd); + FD_ZERO(&e_fd); + + if(CURLM_OK != (curlm_code = curl_multi_timeout(mh, &milliseconds))){ + SYSLOGERR("readdir_multi_head: curl_multi_timeout code: %d msg: %s", curlm_code, curl_multi_strerror(curlm_code)); + FGPRINT(" readdir_multi_head: curl_multi_timeout code: %d msg: %s\n", curlm_code, curl_multi_strerror(curlm_code)); + } + if(milliseconds < 0){ + milliseconds = 50; + } + if(milliseconds > 0) { + int max_fd; + struct timeval timeout; + timeout.tv_sec = 1000 * milliseconds / 1000000; + timeout.tv_usec = 1000 * milliseconds % 1000000; + + if(CURLM_OK != (curlm_code = curl_multi_fdset(mh, &r_fd, &w_fd, &e_fd, &max_fd))){ + SYSLOGERR("readdir_multi_head: curl_multi_fdset code: %d msg: %s", curlm_code, curl_multi_strerror(curlm_code)); + FGPRINT(" readdir_multi_head: curl_multi_fdset code: %d msg: %s\n", curlm_code, curl_multi_strerror(curlm_code)); + curl_multi_cleanup(mh); + return -EIO; + } + if(-1 == select(max_fd + 1, &r_fd, &w_fd, &e_fd, &timeout)){ + curl_multi_cleanup(mh); + YIKES(-errno); + } + } + } + }while(still_running); + + // Read the result + while((msg = curl_multi_info_read(mh, &remaining_messages))) { + if(CURLMSG_DONE != msg->msg){ + SYSLOGERR("readdir_multi_head: curl_multi_info_read code: %d", msg->msg); + FGPRINT(" readdir_multi_head: curl_multi_info_read code: %d\n", msg->msg); + curl_multi_cleanup(mh); + return -EIO; + } + + if(CURLE_OK == msg->data.result){ + head_data response= curl_map.get()[msg->easy_handle]; + long responseCode = -1; + + if(CURLE_OK == curl_easy_getinfo(msg->easy_handle, CURLINFO_RESPONSE_CODE, &responseCode) && 400 > responseCode){ + // add into stat cache + if(!StatCache::getStatCacheData()->AddStat(response.path, (*response.responseHeaders))){ + FGPRINT(" readdir_multi_head: failed adding stat cache [path=%s]\n", response.path.c_str()); + } + }else{ + // This case is directory object("dir", "non dir object", "_$folder$", etc) + //FGPRINT(" readdir_multi_head: failed a request(%s)\n", response.base_path.c_str()); + } + // remove request path. + headlist.remove(response.base_path); + + }else{ + SYSLOGDBGERR("readdir_multi_head: failed to read - remaining_msgs: %i code: %d msg: %s", + remaining_messages, msg->data.result, curl_easy_strerror(msg->data.result)); + FGPRINT(" readdir_multi_head: failed to read - remaining_msgs: %i code: %d msg: %s\n", + remaining_messages, msg->data.result, curl_easy_strerror(msg->data.result)); + } + + // Cleanup this curl handle and headers + curl_multi_remove_handle(mh, msg->easy_handle); + curl_map.remove(msg->easy_handle); // with destroy curl handle. + } + curl_multi_cleanup(mh); + curl_map.removeAll(); // with destroy curl handle. + } + return 0; +} + +static int s3fs_readdir(const char *path, void *buf, fuse_fill_dir_t filler, off_t offset, struct fuse_file_info *fi) +{ S3ObjList head; s3obj_list_t headlist; - auto_head curl_map; + int result; FGPRINT("s3fs_readdir[path=%s]\n", path); - int result; if(0 != (result = check_object_access(path, X_OK, NULL))){ return result; } // get a list of all the objects if((result = list_bucket(path, head, "/")) != 0){ - FGPRINT(" s3fs_readdir list_bucket returns error.\n"); + FGPRINT(" s3fs_readdir list_bucket returns error(%d).\n", result); return result; } @@ -2938,139 +3086,15 @@ static int s3fs_readdir( filler(buf, (*liter).c_str(), 0, 0); } - // populate the multi interface with an initial set of requests - n_reqs = 0; - mh = curl_multi_init(); - - // Add curl handle to multi session. + // Send multi head request for stats caching. string strpath = path; if(strcmp(path, "/") != 0){ strpath += "/"; } - headlist.clear(); - head.GetNameList(headlist, true, false); // get name with "/". - for(liter = headlist.begin(); headlist.end() != liter; liter++){ - string fullpath = strpath + (*liter); - string fullorg = strpath + head.GetOrgName((*liter).c_str()); - string etag = head.GetETag((*liter).c_str()); - - if(StatCache::getStatCacheData()->HasStat(fullpath, etag.c_str())) { - continue; - } - - // file not cached, prepare a call to get_headers - head_data request_data; - request_data.path = fullorg; - CURL* curl_handle = create_head_handle(&request_data); - my_set_curl_share(curl_handle); // set dns cache - request_data.path = fullpath; // Notice: replace org to normalized for cache key. - curl_map.get()[curl_handle] = request_data; - - // add this handle to the multi handle - n_reqs++; - curlm_code = curl_multi_add_handle(mh, curl_handle); - if(curlm_code != CURLM_OK) { - SYSLOGERR("readdir: curl_multi_add_handle code: %d msg: %s", curlm_code, curl_multi_strerror(curlm_code)); - curl_multi_cleanup(mh); - return -EIO; - } + if(0 != (result = readdir_multi_head(strpath.c_str(), head))){ + FGPRINT(" s3fs_readdir readdir_multi_head returns error(%d).\n", result); } - - int still_running = 0; - do{ - // Start making requests and check running. - still_running = 0; - do { - curlm_code = curl_multi_perform(mh, &still_running); - } while(curlm_code == CURLM_CALL_MULTI_PERFORM); - - if(curlm_code != CURLM_OK) { - SYSLOGERR("readdir: curl_multi_perform code: %d msg: %s", - curlm_code, curl_multi_strerror(curlm_code)); - } - - // Set timer when still running - if(still_running) { - fd_set r_fd; - fd_set w_fd; - fd_set e_fd; - FD_ZERO(&r_fd); - FD_ZERO(&w_fd); - FD_ZERO(&e_fd); - - long milliseconds; - curlm_code = curl_multi_timeout(mh, &milliseconds); - if(curlm_code != CURLM_OK) { - SYSLOGERR("readdir: curl_multi_perform code: %d msg: %s", - curlm_code, curl_multi_strerror(curlm_code)); - } - - if(milliseconds < 0) - milliseconds = 50; - if(milliseconds > 0) { - struct timeval timeout; - timeout.tv_sec = 1000 * milliseconds / 1000000; - timeout.tv_usec = 1000 * milliseconds % 1000000; - - int max_fd; - curlm_code = curl_multi_fdset(mh, &r_fd, &w_fd, &e_fd, &max_fd); - if(curlm_code != CURLM_OK) { - SYSLOGERR("readdir: curl_multi_fdset code: %d msg: %s", - curlm_code, curl_multi_strerror(curlm_code)); - - curl_multi_cleanup(mh); - return -EIO; - } - - if(select(max_fd + 1, &r_fd, &w_fd, &e_fd, &timeout) == -1){ - curl_multi_cleanup(mh); - YIKES(-errno); - } - } - } - - // Read the result - while((msg = curl_multi_info_read(mh, &remaining_messages))) { - if(msg->msg != CURLMSG_DONE) { - SYSLOGERR("readdir: curl_multi_add_handle code: %d msg: %s", - curlm_code, curl_multi_strerror(curlm_code)); - curl_multi_cleanup(mh); - return -EIO; - } - - CURLcode code = msg->data.result; - if(code != 0) { - SYSLOGERR("s3fs_readdir: remaining_msgs: %i code: %d msg: %s", - remaining_messages, code, curl_easy_strerror(code)); - curl_multi_cleanup(mh); - return -EIO; - } - - CURL *curl_handle = msg->easy_handle; - head_data response= curl_map.get()[curl_handle]; - long responseCode = -1; - if(CURLE_OK != curl_easy_getinfo(curl_handle, CURLINFO_RESPONSE_CODE, &responseCode) || 400 <= responseCode){ - curl_multi_remove_handle(mh, curl_handle); - curl_map.remove(curl_handle); - n_reqs--; - continue; - } - - // add into stat cache - if(!StatCache::getStatCacheData()->AddStat(response.path, (*response.responseHeaders))){ - FGPRINT("s3fs_readdir: failed adding stat cache [path=%s]\n", response.path.c_str()); - } - - // cleanup - curl_multi_remove_handle(mh, curl_handle); - curl_map.remove(curl_handle); - n_reqs--; - } - }while(still_running); - - curl_multi_cleanup(mh); - - return 0; + return result; } static int list_bucket(const char *path, S3ObjList& head, const char* delimiter)