Fixed Issue 235

1) Problems using encrypted connection to s3(Issue 235)
    In s3fs_readdir() function, s3fs gets CURLE_COULDNT_CONNECT error when s3fs reads objects header information.
    Probably, this problem is too many request in curl_multi request.
    Then s3fs codes are changed:
    * max request in curl_multi request is 500 and s3fs loops to call curl_multi.
    * retries to call request which returns error as CURLE_COULDNT_CONNECT.




git-svn-id: http://s3fs.googlecode.com/svn/trunk@430 df820570-a93a-0410-bd06-b72b767a4274
This commit is contained in:
ggtakec@gmail.com 2013-05-27 01:15:48 +00:00
parent 7477224d02
commit 7aa11f389a
2 changed files with 168 additions and 139 deletions

View File

@ -55,6 +55,7 @@ class auto_curl_slist {
// header data
struct head_data {
std::string base_path;
std::string path;
std::string *url;
struct curl_slist *requestHeaders;
@ -81,7 +82,7 @@ class auto_head {
public:
auto_head() {}
~auto_head() {
for_each(headMap.begin(), headMap.end(), cleanup_head_data());
removeAll();
}
headMap_t& get() { return headMap; }
@ -101,6 +102,10 @@ class auto_head {
headMap.erase(iter);
}
void removeAll(void) {
for_each(headMap.begin(), headMap.end(), cleanup_head_data());
}
private:
headMap_t headMap;
};

View File

@ -53,6 +53,7 @@ using namespace std;
//-------------------------------------------------------------------
// Define
//-------------------------------------------------------------------
#define MAX_MULTI_HEADREQ 500 // max request count in readdir curl_multi.
#define DIRTYPE_UNKNOWN -1
#define DIRTYPE_NEW 0
#define DIRTYPE_OLD 1
@ -135,6 +136,7 @@ static int get_object_attribute(const char *path, struct stat *pstbuf, headers_t
static int check_object_access(const char *path, int mask, struct stat* pstbuf);
static int check_object_owner(const char *path, struct stat* pstbuf);
static int check_parent_object_access(const char *path, int mask);
static int readdir_multi_head(const char *path, S3ObjList& head);
static int list_bucket(const char *path, S3ObjList& head, const char* delimiter);
static int directory_empty(const char *path);
static bool is_truncated(const char *xml);
@ -2900,27 +2902,173 @@ static int s3fs_opendir(const char *path, struct fuse_file_info *fi)
return result;
}
static int s3fs_readdir(
const char *path, void *buf, fuse_fill_dir_t filler, off_t offset, struct fuse_file_info *fi) {
static int readdir_multi_head(const char *path, S3ObjList& head)
{
CURLM* mh;
CURLMsg *msg;
CURLMcode curlm_code;
int n_reqs;
CURLMsg* msg;
int still_running;
int remaining_messages;
int cnt;
size_t last_remaining_host;
s3obj_list_t headlist;
auto_head curl_map; // delete object and curl handle automatically.
s3obj_list_t::iterator liter;
// Make base path list.
head.GetNameList(headlist, true, false); // get name with "/".
FGPRINT(" readdir_multi_head[path=%s][list=%ld]\n", path, headlist.size());
// Make request list.
//
// Send multi request loop( with retry )
// (When many request is sends, sometimes gets "Couldn't connect to server")
//
for(last_remaining_host = headlist.size() + 1; last_remaining_host > headlist.size() && headlist.size(); ){
last_remaining_host = headlist.size();
// populate the multi interface with an initial set of requests
mh = curl_multi_init();
// Make single head request.
for(liter = headlist.begin(), cnt = 0; headlist.end() != liter && cnt < MAX_MULTI_HEADREQ; ){
string fullpath = path + (*liter);
string fullorg = path + head.GetOrgName((*liter).c_str());
string etag = head.GetETag((*liter).c_str());
if(StatCache::getStatCacheData()->HasStat(fullpath, etag.c_str())){
liter = headlist.erase(liter);
continue;
}
// file not cached, prepare a call to get_headers
head_data request_data;
request_data.base_path = (*liter);
request_data.path = fullorg;
CURL* curl_handle = create_head_handle(&request_data);
my_set_curl_share(curl_handle); // set dns cache
request_data.path = fullpath; // Notice: replace org to normalized for cache key.
curl_map.get()[curl_handle] = request_data;
// add this handle to the multi handle
if(CURLM_OK != (curlm_code = curl_multi_add_handle(mh, curl_handle))){
SYSLOGERR("readdir_multi_head: curl_multi_add_handle code: %d msg: %s", curlm_code, curl_multi_strerror(curlm_code));
FGPRINT(" readdir_multi_head: curl_multi_add_handle code: %d msg: %s\n", curlm_code, curl_multi_strerror(curlm_code));
curl_multi_cleanup(mh);
return -EIO;
}
liter++;
cnt++; // max request count in multi-request is MAX_MULTI_HEADREQ.
}
// Send multi request.
do{
// Start making requests and check running.
still_running = 0;
do {
curlm_code = curl_multi_perform(mh, &still_running);
} while(curlm_code == CURLM_CALL_MULTI_PERFORM);
if(curlm_code != CURLM_OK) {
SYSLOGERR("readdir_multi_head: curl_multi_perform code: %d msg: %s", curlm_code, curl_multi_strerror(curlm_code));
FGPRINT(" readdir_multi_head: curl_multi_perform code: %d msg: %s\n", curlm_code, curl_multi_strerror(curlm_code));
}
// Set timer when still running
if(still_running) {
long milliseconds;
fd_set r_fd;
fd_set w_fd;
fd_set e_fd;
FD_ZERO(&r_fd);
FD_ZERO(&w_fd);
FD_ZERO(&e_fd);
if(CURLM_OK != (curlm_code = curl_multi_timeout(mh, &milliseconds))){
SYSLOGERR("readdir_multi_head: curl_multi_timeout code: %d msg: %s", curlm_code, curl_multi_strerror(curlm_code));
FGPRINT(" readdir_multi_head: curl_multi_timeout code: %d msg: %s\n", curlm_code, curl_multi_strerror(curlm_code));
}
if(milliseconds < 0){
milliseconds = 50;
}
if(milliseconds > 0) {
int max_fd;
struct timeval timeout;
timeout.tv_sec = 1000 * milliseconds / 1000000;
timeout.tv_usec = 1000 * milliseconds % 1000000;
if(CURLM_OK != (curlm_code = curl_multi_fdset(mh, &r_fd, &w_fd, &e_fd, &max_fd))){
SYSLOGERR("readdir_multi_head: curl_multi_fdset code: %d msg: %s", curlm_code, curl_multi_strerror(curlm_code));
FGPRINT(" readdir_multi_head: curl_multi_fdset code: %d msg: %s\n", curlm_code, curl_multi_strerror(curlm_code));
curl_multi_cleanup(mh);
return -EIO;
}
if(-1 == select(max_fd + 1, &r_fd, &w_fd, &e_fd, &timeout)){
curl_multi_cleanup(mh);
YIKES(-errno);
}
}
}
}while(still_running);
// Read the result
while((msg = curl_multi_info_read(mh, &remaining_messages))) {
if(CURLMSG_DONE != msg->msg){
SYSLOGERR("readdir_multi_head: curl_multi_info_read code: %d", msg->msg);
FGPRINT(" readdir_multi_head: curl_multi_info_read code: %d\n", msg->msg);
curl_multi_cleanup(mh);
return -EIO;
}
if(CURLE_OK == msg->data.result){
head_data response= curl_map.get()[msg->easy_handle];
long responseCode = -1;
if(CURLE_OK == curl_easy_getinfo(msg->easy_handle, CURLINFO_RESPONSE_CODE, &responseCode) && 400 > responseCode){
// add into stat cache
if(!StatCache::getStatCacheData()->AddStat(response.path, (*response.responseHeaders))){
FGPRINT(" readdir_multi_head: failed adding stat cache [path=%s]\n", response.path.c_str());
}
}else{
// This case is directory object("dir", "non dir object", "_$folder$", etc)
//FGPRINT(" readdir_multi_head: failed a request(%s)\n", response.base_path.c_str());
}
// remove request path.
headlist.remove(response.base_path);
}else{
SYSLOGDBGERR("readdir_multi_head: failed to read - remaining_msgs: %i code: %d msg: %s",
remaining_messages, msg->data.result, curl_easy_strerror(msg->data.result));
FGPRINT(" readdir_multi_head: failed to read - remaining_msgs: %i code: %d msg: %s\n",
remaining_messages, msg->data.result, curl_easy_strerror(msg->data.result));
}
// Cleanup this curl handle and headers
curl_multi_remove_handle(mh, msg->easy_handle);
curl_map.remove(msg->easy_handle); // with destroy curl handle.
}
curl_multi_cleanup(mh);
curl_map.removeAll(); // with destroy curl handle.
}
return 0;
}
static int s3fs_readdir(const char *path, void *buf, fuse_fill_dir_t filler, off_t offset, struct fuse_file_info *fi)
{
S3ObjList head;
s3obj_list_t headlist;
auto_head curl_map;
int result;
FGPRINT("s3fs_readdir[path=%s]\n", path);
int result;
if(0 != (result = check_object_access(path, X_OK, NULL))){
return result;
}
// get a list of all the objects
if((result = list_bucket(path, head, "/")) != 0){
FGPRINT(" s3fs_readdir list_bucket returns error.\n");
FGPRINT(" s3fs_readdir list_bucket returns error(%d).\n", result);
return result;
}
@ -2938,139 +3086,15 @@ static int s3fs_readdir(
filler(buf, (*liter).c_str(), 0, 0);
}
// populate the multi interface with an initial set of requests
n_reqs = 0;
mh = curl_multi_init();
// Add curl handle to multi session.
// Send multi head request for stats caching.
string strpath = path;
if(strcmp(path, "/") != 0){
strpath += "/";
}
headlist.clear();
head.GetNameList(headlist, true, false); // get name with "/".
for(liter = headlist.begin(); headlist.end() != liter; liter++){
string fullpath = strpath + (*liter);
string fullorg = strpath + head.GetOrgName((*liter).c_str());
string etag = head.GetETag((*liter).c_str());
if(StatCache::getStatCacheData()->HasStat(fullpath, etag.c_str())) {
continue;
if(0 != (result = readdir_multi_head(strpath.c_str(), head))){
FGPRINT(" s3fs_readdir readdir_multi_head returns error(%d).\n", result);
}
// file not cached, prepare a call to get_headers
head_data request_data;
request_data.path = fullorg;
CURL* curl_handle = create_head_handle(&request_data);
my_set_curl_share(curl_handle); // set dns cache
request_data.path = fullpath; // Notice: replace org to normalized for cache key.
curl_map.get()[curl_handle] = request_data;
// add this handle to the multi handle
n_reqs++;
curlm_code = curl_multi_add_handle(mh, curl_handle);
if(curlm_code != CURLM_OK) {
SYSLOGERR("readdir: curl_multi_add_handle code: %d msg: %s", curlm_code, curl_multi_strerror(curlm_code));
curl_multi_cleanup(mh);
return -EIO;
}
}
int still_running = 0;
do{
// Start making requests and check running.
still_running = 0;
do {
curlm_code = curl_multi_perform(mh, &still_running);
} while(curlm_code == CURLM_CALL_MULTI_PERFORM);
if(curlm_code != CURLM_OK) {
SYSLOGERR("readdir: curl_multi_perform code: %d msg: %s",
curlm_code, curl_multi_strerror(curlm_code));
}
// Set timer when still running
if(still_running) {
fd_set r_fd;
fd_set w_fd;
fd_set e_fd;
FD_ZERO(&r_fd);
FD_ZERO(&w_fd);
FD_ZERO(&e_fd);
long milliseconds;
curlm_code = curl_multi_timeout(mh, &milliseconds);
if(curlm_code != CURLM_OK) {
SYSLOGERR("readdir: curl_multi_perform code: %d msg: %s",
curlm_code, curl_multi_strerror(curlm_code));
}
if(milliseconds < 0)
milliseconds = 50;
if(milliseconds > 0) {
struct timeval timeout;
timeout.tv_sec = 1000 * milliseconds / 1000000;
timeout.tv_usec = 1000 * milliseconds % 1000000;
int max_fd;
curlm_code = curl_multi_fdset(mh, &r_fd, &w_fd, &e_fd, &max_fd);
if(curlm_code != CURLM_OK) {
SYSLOGERR("readdir: curl_multi_fdset code: %d msg: %s",
curlm_code, curl_multi_strerror(curlm_code));
curl_multi_cleanup(mh);
return -EIO;
}
if(select(max_fd + 1, &r_fd, &w_fd, &e_fd, &timeout) == -1){
curl_multi_cleanup(mh);
YIKES(-errno);
}
}
}
// Read the result
while((msg = curl_multi_info_read(mh, &remaining_messages))) {
if(msg->msg != CURLMSG_DONE) {
SYSLOGERR("readdir: curl_multi_add_handle code: %d msg: %s",
curlm_code, curl_multi_strerror(curlm_code));
curl_multi_cleanup(mh);
return -EIO;
}
CURLcode code = msg->data.result;
if(code != 0) {
SYSLOGERR("s3fs_readdir: remaining_msgs: %i code: %d msg: %s",
remaining_messages, code, curl_easy_strerror(code));
curl_multi_cleanup(mh);
return -EIO;
}
CURL *curl_handle = msg->easy_handle;
head_data response= curl_map.get()[curl_handle];
long responseCode = -1;
if(CURLE_OK != curl_easy_getinfo(curl_handle, CURLINFO_RESPONSE_CODE, &responseCode) || 400 <= responseCode){
curl_multi_remove_handle(mh, curl_handle);
curl_map.remove(curl_handle);
n_reqs--;
continue;
}
// add into stat cache
if(!StatCache::getStatCacheData()->AddStat(response.path, (*response.responseHeaders))){
FGPRINT("s3fs_readdir: failed adding stat cache [path=%s]\n", response.path.c_str());
}
// cleanup
curl_multi_remove_handle(mh, curl_handle);
curl_map.remove(curl_handle);
n_reqs--;
}
}while(still_running);
curl_multi_cleanup(mh);
return 0;
return result;
}
static int list_bucket(const char *path, S3ObjList& head, const char* delimiter)