From 8484ddab8a260370565060c54ad92810db4a99d0 Mon Sep 17 00:00:00 2001 From: rrizun Date: Sat, 23 Feb 2008 15:35:40 +0000 Subject: [PATCH] readdir: parallel git-svn-id: http://s3fs.googlecode.com/svn/trunk@92 df820570-a93a-0410-bd06-b72b767a4274 --- s3fs/s3fs.cpp | 284 ++++++++++++++++++++++++++++++++++++++++++++------ 1 file changed, 250 insertions(+), 34 deletions(-) diff --git a/s3fs/s3fs.cpp b/s3fs/s3fs.cpp index 20b2a46..3e5ac78 100644 --- a/s3fs/s3fs.cpp +++ b/s3fs/s3fs.cpp @@ -1,5 +1,5 @@ /* - * s3fs - FUSE filesystem backed by Amazon S3 + * s3fs - FUSE-based file system backed by Amazon S3 * * Copyright 2007-2008 Randy Rizun * @@ -116,30 +116,57 @@ public: stack curl_handles; static pthread_mutex_t curl_handles_lock; -class auto_curl { +static CURL* +alloc_curl_handle() { CURL* curl; -public: - auto_curl() { - auto_lock lock(curl_handles_lock); - if (curl_handles.size() == 0) - curl = curl_easy_init(); - else { - curl = curl_handles.top(); - curl_handles.pop(); - } - curl_easy_reset(curl); - long seconds = 10; - //###curl_easy_setopt(curl, CURLOPT_TIMEOUT, seconds); // bad idea - curl_easy_setopt(curl, CURLOPT_CONNECTTIMEOUT, seconds); + auto_lock lock(curl_handles_lock); + if (curl_handles.size() == 0) + curl = curl_easy_init(); + else { + curl = curl_handles.top(); + curl_handles.pop(); } + curl_easy_reset(curl); + long seconds = 10; + curl_easy_setopt(curl, CURLOPT_CONNECTTIMEOUT, seconds); + return curl; +} + +static void +return_curl_handle(CURL* curl_handle) { + if (curl_handle != 0) { + auto_lock lock(curl_handles_lock); + curl_handles.push(curl_handle); + } +} + +class auto_curl { + CURL* curl_handle; +public: + auto_curl(): curl_handle(alloc_curl_handle()) { + } +// auto_curl(CURL* curl): curl(curl) { +//// auto_lock lock(curl_handles_lock); +//// if (curl_handles.size() == 0) +//// curl = curl_easy_init(); +//// else { +//// curl = curl_handles.top(); +//// curl_handles.pop(); +//// } +//// curl_easy_reset(curl); +//// long seconds = 10; +//// //###curl_easy_setopt(curl, CURLOPT_TIMEOUT, seconds); // bad idea +//// curl_easy_setopt(curl, CURLOPT_CONNECTTIMEOUT, seconds); +// } ~auto_curl() { - if (curl != 0) { - auto_lock lock(curl_handles_lock); - curl_handles.push(curl); + if (curl_handle != 0) { + return_curl_handle(curl_handle); +// auto_lock lock(curl_handles_lock); +// curl_handles.push(curl); } } CURL* get() const { - return curl; + return curl_handle; } // CURL* release() { // CURL* tmp = curl; @@ -154,7 +181,35 @@ public: // this->curl = curl; // } operator CURL*() const { - return curl; + return curl_handle; + } +}; + +struct curl_multi_remove_handle_functor { + CURLM* multi_handle; + curl_multi_remove_handle_functor(CURLM* multi_handle): multi_handle(multi_handle) { + } + void operator()(CURL* curl_handle) { + curl_multi_remove_handle(multi_handle, curl_handle); + return_curl_handle(curl_handle); + } +}; + +class auto_curl_multi { + CURLM* multi_handle; + vector curl_handles; +public: + auto_curl_multi(): multi_handle(curl_multi_init()) { + } + ~auto_curl_multi() { + curl_multi_cleanup(for_each(curl_handles.begin(), curl_handles.end(), curl_multi_remove_handle_functor(multi_handle)).multi_handle); + } + CURLM* get() const { + return multi_handle; + } + void add_curl(CURL* curl_handle) { + curl_handles.push_back(curl_handle); + curl_multi_add_handle(multi_handle, curl_handle); } }; @@ -216,6 +271,11 @@ static mode_t root_mode = 0; // if .size()==0 then local file cache is disabled static string use_cache; +// key=path +typedef map stat_cache_t; +static stat_cache_t stat_cache; +static pthread_mutex_t stat_cache_lock; + static const char hexAlphabet[] = "0123456789ABCDEF"; /** @@ -338,7 +398,7 @@ writeCallback(void* data, size_t blockSize, size_t numBlocks, void* userPtr) { return blockSize*numBlocks; } -static size_t headerCallback(void *data, +static size_t header_callback(void *data, size_t blockSize, size_t numBlocks, void *userPtr) { @@ -403,7 +463,7 @@ get_headers(const char* path, headers_t& meta) { headers_t responseHeaders; curl_easy_setopt(curl, CURLOPT_HEADERDATA, &responseHeaders); - curl_easy_setopt(curl, CURLOPT_HEADERFUNCTION, headerCallback); + curl_easy_setopt(curl, CURLOPT_HEADERFUNCTION, header_callback); auto_curl_slist headers; string date = get_date(); @@ -595,6 +655,16 @@ s3fs_getattr(const char *path, struct stat *stbuf) { return 0; } + { + auto_lock lock(stat_cache_lock); + stat_cache_t::iterator iter = stat_cache.find(path); + if (iter != stat_cache.end()) { + *stbuf = (*iter).second; + stat_cache.erase(path); + return 0; + } + } + string resource = urlEncode("/"+bucket + path); string url = host + resource; @@ -607,7 +677,7 @@ s3fs_getattr(const char *path, struct stat *stbuf) { headers_t responseHeaders; curl_easy_setopt(curl, CURLOPT_HEADERDATA, &responseHeaders); - curl_easy_setopt(curl, CURLOPT_HEADERFUNCTION, headerCallback); + curl_easy_setopt(curl, CURLOPT_HEADERFUNCTION, header_callback); auto_curl_slist headers; string date = get_date(); @@ -619,16 +689,14 @@ s3fs_getattr(const char *path, struct stat *stbuf) { stbuf->st_nlink = 1; // see fuse faq - //stbuf->st_mtime = atol(responseHeaders["x-amz-meta-mtime"].c_str()); - stbuf->st_mtime = strtol(responseHeaders["x-amz-meta-mtime"].c_str(), (char **)NULL, 10); + stbuf->st_mtime = strtoul(responseHeaders["x-amz-meta-mtime"].c_str(), (char **)NULL, 10); if (stbuf->st_mtime == 0) { long LastModified; if (curl_easy_getinfo(curl, CURLINFO_FILETIME, &LastModified) == 0) stbuf->st_mtime = LastModified; } - //stbuf->st_mode = atoi(responseHeaders["x-amz-meta-mode"].c_str()); - stbuf->st_mode = strtol(responseHeaders["x-amz-meta-mode"].c_str(), (char **)NULL, 10); + stbuf->st_mode = strtoul(responseHeaders["x-amz-meta-mode"].c_str(), (char **)NULL, 10); char* ContentType = 0; if (curl_easy_getinfo(curl, CURLINFO_CONTENT_TYPE, &ContentType) == 0) { if (ContentType) @@ -919,6 +987,38 @@ my_timegm (struct tm *tm) { return ret; } +// All this "stuff" stuff is kinda ugly... it works though... needs cleanup +struct stuff_t { + // default ctor works + string path; + string* url; + struct curl_slist* requestHeaders; + headers_t* responseHeaders; +}; +typedef map stuffMap_t; + +struct cleanup_stuff { + void operator()(pair qqq) { + stuff_t stuff = qqq.second; + delete stuff.url; + curl_slist_free_all(stuff.requestHeaders); + delete stuff.responseHeaders; + } +}; +class auto_stuff { + stuffMap_t stuffMap; +public: + auto_stuff() { + + } + ~auto_stuff() { + for_each(stuffMap.begin(), stuffMap.end(), cleanup_stuff()); + } + stuffMap_t& get() { + return stuffMap; + } +}; + static int s3fs_readdir(const char *path, void *buf, fuse_fill_dir_t filler, off_t offset, struct fuse_file_info *fi) { //cout << "readdir:"<< " path="<< path << endl; @@ -939,6 +1039,7 @@ s3fs_readdir(const char *path, void *buf, fuse_fill_dir_t filler, off_t offset, string url = host + resource + "?"+ query; + { auto_curl curl; curl_easy_setopt(curl, CURLOPT_URL, url.c_str()); curl_easy_setopt(curl, CURLOPT_FAILONERROR, true); @@ -946,9 +1047,9 @@ s3fs_readdir(const char *path, void *buf, fuse_fill_dir_t filler, off_t offset, curl_easy_setopt(curl, CURLOPT_WRITEDATA, &responseText); curl_easy_setopt(curl, CURLOPT_WRITEFUNCTION, writeCallback); - headers_t headersMap; - curl_easy_setopt(curl, CURLOPT_HEADERDATA, &headersMap); - curl_easy_setopt(curl, CURLOPT_HEADERFUNCTION, headerCallback); +// headers_t responseHeaders; +// curl_easy_setopt(curl, CURLOPT_HEADERDATA, &responseHeaders); +// curl_easy_setopt(curl, CURLOPT_HEADERFUNCTION, header_callback); auto_curl_slist headers; string date = get_date(); @@ -956,9 +1057,13 @@ s3fs_readdir(const char *path, void *buf, fuse_fill_dir_t filler, off_t offset, headers.append("Authorization: AWS "+AWSAccessKeyId+":"+calc_signature("GET", "", date, headers.get(), resource)); curl_easy_setopt(curl, CURLOPT_HTTPHEADER, headers.get()); - - VERIFY(my_curl_easy_perform(curl.get())); - + + VERIFY(my_curl_easy_perform(curl.get())); + } + + auto_stuff curlMap; + auto_curl_multi multi_handle; + { xmlDocPtr doc = xmlReadMemory(responseText.c_str(), responseText.size(), "", NULL, 0); if (doc != NULL&& doc->children != NULL) { @@ -991,6 +1096,38 @@ s3fs_readdir(const char *path, void *buf, fuse_fill_dir_t filler, off_t offset, if (Key.size() > 0) { if (filler(buf, mybasename(Key).c_str(), 0, 0)) break; + + CURL* curl_handle = alloc_curl_handle(); + + string resource = urlEncode("/"+bucket + "/" + Key); + string url = host + resource; + + stuff_t stuff; + stuff.path = "/"+Key; + + // libcurl 7.17 does deep copy of url... e.g., fc7 has libcurl 7.16... therefore, must deep copy "stable" url... + stuff.url = new string(url); + stuff.requestHeaders = 0; + stuff.responseHeaders = new headers_t; + + curl_easy_setopt(curl_handle, CURLOPT_URL, stuff.url->c_str()); + curl_easy_setopt(curl_handle, CURLOPT_FAILONERROR, true); + curl_easy_setopt(curl_handle, CURLOPT_FOLLOWLOCATION, true); + curl_easy_setopt(curl_handle, CURLOPT_NOBODY, true); // HEAD + curl_easy_setopt(curl_handle, CURLOPT_FILETIME, true); // Last-Modified + + // requestHeaders + string date = get_date(); + stuff.requestHeaders = curl_slist_append(stuff.requestHeaders, string("Date: "+date).c_str()); + stuff.requestHeaders = curl_slist_append(stuff.requestHeaders, string("Authorization: AWS "+AWSAccessKeyId+":"+calc_signature("HEAD", "", date, stuff.requestHeaders, resource)).c_str()); + curl_easy_setopt(curl_handle, CURLOPT_HTTPHEADER, stuff.requestHeaders); + + // responseHeaders + curl_easy_setopt(curl_handle, CURLOPT_HEADERDATA, stuff.responseHeaders); + curl_easy_setopt(curl_handle, CURLOPT_HEADERFUNCTION, header_callback); + + curlMap.get()[curl_handle] = stuff; + multi_handle.add_curl(curl_handle); } } } @@ -998,7 +1135,84 @@ s3fs_readdir(const char *path, void *buf, fuse_fill_dir_t filler, off_t offset, } xmlFreeDoc(doc); } - } + + int running_handles; + + while (curl_multi_perform(multi_handle.get(), &running_handles) == CURLM_CALL_MULTI_PERFORM) + cout << "[1]" << endl; + + while (running_handles) { + fd_set read_fd_set; + fd_set write_fd_set; + fd_set exc_fd_set; + + FD_ZERO(&read_fd_set); + FD_ZERO(&write_fd_set); + FD_ZERO(&exc_fd_set); + + time_t milliseconds; + VERIFY(curl_multi_timeout(multi_handle.get(), &milliseconds)); + + if (milliseconds > 0) { + struct timeval timeout; + timeout.tv_sec = 0; + timeout.tv_usec = 1000*milliseconds; + + int max_fd; + VERIFY(curl_multi_fdset(multi_handle.get(), &read_fd_set, &write_fd_set, &exc_fd_set, &max_fd)); + + if (select(max_fd + 1, &read_fd_set, &write_fd_set, &exc_fd_set, &timeout) == -1) + Yikes(-errno); + } + + while (curl_multi_perform(multi_handle.get(), &running_handles) == CURLM_CALL_MULTI_PERFORM) + cout << "[2]" << endl; + } + + int remaining_msgs = 1; + while (remaining_msgs) { + // this next line pegs cpu for directories w/lotsa files + CURLMsg* msg = curl_multi_info_read(multi_handle.get(), &remaining_msgs); + if (msg != NULL) { + CURLcode code =msg->data.result; + if (code != 0) + cout << "###" << code << curl_easy_strerror(code) << endl; + if (code == 0) { + CURL* curl_handle = msg->easy_handle; + stuff_t stuff = curlMap.get()[curl_handle]; + + struct stat st; + memset(&st, 0, sizeof(st)); + st.st_nlink = 1; // see fuse faq + // mode + st.st_mode = strtoul((*stuff.responseHeaders)["x-amz-meta-mode"].c_str(), (char **)NULL, 10); + char* ContentType = 0; + if (curl_easy_getinfo(curl_handle, CURLINFO_CONTENT_TYPE, &ContentType) == 0) { + if (ContentType) + st.st_mode |= strcmp(ContentType, "application/x-directory") == 0 ? S_IFDIR : S_IFREG; + } + // mtime + st.st_mtime = strtoul((*stuff.responseHeaders)["x-amz-meta-mtime"].c_str(), (char **)NULL, 10); + if (st.st_mtime == 0) { + long LastModified; + if (curl_easy_getinfo(curl_handle, CURLINFO_FILETIME, &LastModified) == 0) + st.st_mtime = LastModified; + } + // size + double ContentLength; + if (curl_easy_getinfo(curl_handle, CURLINFO_CONTENT_LENGTH_DOWNLOAD, &ContentLength) == 0) + st.st_size = static_cast(ContentLength); + // blocks + if (S_ISREG(st.st_mode)) + st.st_blocks = st.st_size / 512 + 1; + + auto_lock lock(stat_cache_lock); + stat_cache[stuff.path] = st; + } + } + } + + } // IsTruncated return 0; } @@ -1008,6 +1222,7 @@ s3fs_init(struct fuse_conn_info *conn) { printf("init\n"); pthread_mutex_init(&curl_handles_lock, NULL); pthread_mutex_init(&s3fs_descriptors_lock, NULL); + pthread_mutex_init(&stat_cache_lock, NULL); return 0; } @@ -1016,6 +1231,7 @@ s3fs_destroy(void*) { printf("destroy\n"); pthread_mutex_destroy(&curl_handles_lock); pthread_mutex_destroy(&s3fs_descriptors_lock); + pthread_mutex_destroy(&stat_cache_lock); } static int