readdir: parallel

git-svn-id: http://s3fs.googlecode.com/svn/trunk@92 df820570-a93a-0410-bd06-b72b767a4274
This commit is contained in:
rrizun 2008-02-23 15:35:40 +00:00
parent 891e85e71d
commit 8484ddab8a

View File

@ -1,5 +1,5 @@
/*
* s3fs - FUSE filesystem backed by Amazon S3
* s3fs - FUSE-based file system backed by Amazon S3
*
* Copyright 2007-2008 Randy Rizun <rrizun@gmail.com>
*
@ -116,30 +116,57 @@ public:
stack<CURL*> curl_handles;
static pthread_mutex_t curl_handles_lock;
class auto_curl {
static CURL*
alloc_curl_handle() {
CURL* curl;
public:
auto_curl() {
auto_lock lock(curl_handles_lock);
if (curl_handles.size() == 0)
curl = curl_easy_init();
else {
curl = curl_handles.top();
curl_handles.pop();
}
curl_easy_reset(curl);
long seconds = 10;
//###curl_easy_setopt(curl, CURLOPT_TIMEOUT, seconds); // bad idea
curl_easy_setopt(curl, CURLOPT_CONNECTTIMEOUT, seconds);
auto_lock lock(curl_handles_lock);
if (curl_handles.size() == 0)
curl = curl_easy_init();
else {
curl = curl_handles.top();
curl_handles.pop();
}
curl_easy_reset(curl);
long seconds = 10;
curl_easy_setopt(curl, CURLOPT_CONNECTTIMEOUT, seconds);
return curl;
}
static void
return_curl_handle(CURL* curl_handle) {
if (curl_handle != 0) {
auto_lock lock(curl_handles_lock);
curl_handles.push(curl_handle);
}
}
class auto_curl {
CURL* curl_handle;
public:
auto_curl(): curl_handle(alloc_curl_handle()) {
}
// auto_curl(CURL* curl): curl(curl) {
//// auto_lock lock(curl_handles_lock);
//// if (curl_handles.size() == 0)
//// curl = curl_easy_init();
//// else {
//// curl = curl_handles.top();
//// curl_handles.pop();
//// }
//// curl_easy_reset(curl);
//// long seconds = 10;
//// //###curl_easy_setopt(curl, CURLOPT_TIMEOUT, seconds); // bad idea
//// curl_easy_setopt(curl, CURLOPT_CONNECTTIMEOUT, seconds);
// }
~auto_curl() {
if (curl != 0) {
auto_lock lock(curl_handles_lock);
curl_handles.push(curl);
if (curl_handle != 0) {
return_curl_handle(curl_handle);
// auto_lock lock(curl_handles_lock);
// curl_handles.push(curl);
}
}
CURL* get() const {
return curl;
return curl_handle;
}
// CURL* release() {
// CURL* tmp = curl;
@ -154,7 +181,35 @@ public:
// this->curl = curl;
// }
operator CURL*() const {
return curl;
return curl_handle;
}
};
struct curl_multi_remove_handle_functor {
CURLM* multi_handle;
curl_multi_remove_handle_functor(CURLM* multi_handle): multi_handle(multi_handle) {
}
void operator()(CURL* curl_handle) {
curl_multi_remove_handle(multi_handle, curl_handle);
return_curl_handle(curl_handle);
}
};
class auto_curl_multi {
CURLM* multi_handle;
vector<CURL*> curl_handles;
public:
auto_curl_multi(): multi_handle(curl_multi_init()) {
}
~auto_curl_multi() {
curl_multi_cleanup(for_each(curl_handles.begin(), curl_handles.end(), curl_multi_remove_handle_functor(multi_handle)).multi_handle);
}
CURLM* get() const {
return multi_handle;
}
void add_curl(CURL* curl_handle) {
curl_handles.push_back(curl_handle);
curl_multi_add_handle(multi_handle, curl_handle);
}
};
@ -216,6 +271,11 @@ static mode_t root_mode = 0;
// if .size()==0 then local file cache is disabled
static string use_cache;
// key=path
typedef map<string, struct stat> stat_cache_t;
static stat_cache_t stat_cache;
static pthread_mutex_t stat_cache_lock;
static const char hexAlphabet[] = "0123456789ABCDEF";
/**
@ -338,7 +398,7 @@ writeCallback(void* data, size_t blockSize, size_t numBlocks, void* userPtr) {
return blockSize*numBlocks;
}
static size_t headerCallback(void *data,
static size_t header_callback(void *data,
size_t blockSize,
size_t numBlocks,
void *userPtr) {
@ -403,7 +463,7 @@ get_headers(const char* path, headers_t& meta) {
headers_t responseHeaders;
curl_easy_setopt(curl, CURLOPT_HEADERDATA, &responseHeaders);
curl_easy_setopt(curl, CURLOPT_HEADERFUNCTION, headerCallback);
curl_easy_setopt(curl, CURLOPT_HEADERFUNCTION, header_callback);
auto_curl_slist headers;
string date = get_date();
@ -595,6 +655,16 @@ s3fs_getattr(const char *path, struct stat *stbuf) {
return 0;
}
{
auto_lock lock(stat_cache_lock);
stat_cache_t::iterator iter = stat_cache.find(path);
if (iter != stat_cache.end()) {
*stbuf = (*iter).second;
stat_cache.erase(path);
return 0;
}
}
string resource = urlEncode("/"+bucket + path);
string url = host + resource;
@ -607,7 +677,7 @@ s3fs_getattr(const char *path, struct stat *stbuf) {
headers_t responseHeaders;
curl_easy_setopt(curl, CURLOPT_HEADERDATA, &responseHeaders);
curl_easy_setopt(curl, CURLOPT_HEADERFUNCTION, headerCallback);
curl_easy_setopt(curl, CURLOPT_HEADERFUNCTION, header_callback);
auto_curl_slist headers;
string date = get_date();
@ -619,16 +689,14 @@ s3fs_getattr(const char *path, struct stat *stbuf) {
stbuf->st_nlink = 1; // see fuse faq
//stbuf->st_mtime = atol(responseHeaders["x-amz-meta-mtime"].c_str());
stbuf->st_mtime = strtol(responseHeaders["x-amz-meta-mtime"].c_str(), (char **)NULL, 10);
stbuf->st_mtime = strtoul(responseHeaders["x-amz-meta-mtime"].c_str(), (char **)NULL, 10);
if (stbuf->st_mtime == 0) {
long LastModified;
if (curl_easy_getinfo(curl, CURLINFO_FILETIME, &LastModified) == 0)
stbuf->st_mtime = LastModified;
}
//stbuf->st_mode = atoi(responseHeaders["x-amz-meta-mode"].c_str());
stbuf->st_mode = strtol(responseHeaders["x-amz-meta-mode"].c_str(), (char **)NULL, 10);
stbuf->st_mode = strtoul(responseHeaders["x-amz-meta-mode"].c_str(), (char **)NULL, 10);
char* ContentType = 0;
if (curl_easy_getinfo(curl, CURLINFO_CONTENT_TYPE, &ContentType) == 0) {
if (ContentType)
@ -919,6 +987,38 @@ my_timegm (struct tm *tm) {
return ret;
}
// All this "stuff" stuff is kinda ugly... it works though... needs cleanup
struct stuff_t {
// default ctor works
string path;
string* url;
struct curl_slist* requestHeaders;
headers_t* responseHeaders;
};
typedef map<CURL*, stuff_t> stuffMap_t;
struct cleanup_stuff {
void operator()(pair<CURL*, stuff_t> qqq) {
stuff_t stuff = qqq.second;
delete stuff.url;
curl_slist_free_all(stuff.requestHeaders);
delete stuff.responseHeaders;
}
};
class auto_stuff {
stuffMap_t stuffMap;
public:
auto_stuff() {
}
~auto_stuff() {
for_each(stuffMap.begin(), stuffMap.end(), cleanup_stuff());
}
stuffMap_t& get() {
return stuffMap;
}
};
static int
s3fs_readdir(const char *path, void *buf, fuse_fill_dir_t filler, off_t offset, struct fuse_file_info *fi) {
//cout << "readdir:"<< " path="<< path << endl;
@ -939,6 +1039,7 @@ s3fs_readdir(const char *path, void *buf, fuse_fill_dir_t filler, off_t offset,
string url = host + resource + "?"+ query;
{
auto_curl curl;
curl_easy_setopt(curl, CURLOPT_URL, url.c_str());
curl_easy_setopt(curl, CURLOPT_FAILONERROR, true);
@ -946,9 +1047,9 @@ s3fs_readdir(const char *path, void *buf, fuse_fill_dir_t filler, off_t offset,
curl_easy_setopt(curl, CURLOPT_WRITEDATA, &responseText);
curl_easy_setopt(curl, CURLOPT_WRITEFUNCTION, writeCallback);
headers_t headersMap;
curl_easy_setopt(curl, CURLOPT_HEADERDATA, &headersMap);
curl_easy_setopt(curl, CURLOPT_HEADERFUNCTION, headerCallback);
// headers_t responseHeaders;
// curl_easy_setopt(curl, CURLOPT_HEADERDATA, &responseHeaders);
// curl_easy_setopt(curl, CURLOPT_HEADERFUNCTION, header_callback);
auto_curl_slist headers;
string date = get_date();
@ -956,9 +1057,13 @@ s3fs_readdir(const char *path, void *buf, fuse_fill_dir_t filler, off_t offset,
headers.append("Authorization: AWS "+AWSAccessKeyId+":"+calc_signature("GET", "", date, headers.get(), resource));
curl_easy_setopt(curl, CURLOPT_HTTPHEADER, headers.get());
VERIFY(my_curl_easy_perform(curl.get()));
VERIFY(my_curl_easy_perform(curl.get()));
}
auto_stuff curlMap;
auto_curl_multi multi_handle;
{
xmlDocPtr doc = xmlReadMemory(responseText.c_str(), responseText.size(), "", NULL, 0);
if (doc != NULL&& doc->children != NULL) {
@ -991,6 +1096,38 @@ s3fs_readdir(const char *path, void *buf, fuse_fill_dir_t filler, off_t offset,
if (Key.size() > 0) {
if (filler(buf, mybasename(Key).c_str(), 0, 0))
break;
CURL* curl_handle = alloc_curl_handle();
string resource = urlEncode("/"+bucket + "/" + Key);
string url = host + resource;
stuff_t stuff;
stuff.path = "/"+Key;
// libcurl 7.17 does deep copy of url... e.g., fc7 has libcurl 7.16... therefore, must deep copy "stable" url...
stuff.url = new string(url);
stuff.requestHeaders = 0;
stuff.responseHeaders = new headers_t;
curl_easy_setopt(curl_handle, CURLOPT_URL, stuff.url->c_str());
curl_easy_setopt(curl_handle, CURLOPT_FAILONERROR, true);
curl_easy_setopt(curl_handle, CURLOPT_FOLLOWLOCATION, true);
curl_easy_setopt(curl_handle, CURLOPT_NOBODY, true); // HEAD
curl_easy_setopt(curl_handle, CURLOPT_FILETIME, true); // Last-Modified
// requestHeaders
string date = get_date();
stuff.requestHeaders = curl_slist_append(stuff.requestHeaders, string("Date: "+date).c_str());
stuff.requestHeaders = curl_slist_append(stuff.requestHeaders, string("Authorization: AWS "+AWSAccessKeyId+":"+calc_signature("HEAD", "", date, stuff.requestHeaders, resource)).c_str());
curl_easy_setopt(curl_handle, CURLOPT_HTTPHEADER, stuff.requestHeaders);
// responseHeaders
curl_easy_setopt(curl_handle, CURLOPT_HEADERDATA, stuff.responseHeaders);
curl_easy_setopt(curl_handle, CURLOPT_HEADERFUNCTION, header_callback);
curlMap.get()[curl_handle] = stuff;
multi_handle.add_curl(curl_handle);
}
}
}
@ -998,7 +1135,84 @@ s3fs_readdir(const char *path, void *buf, fuse_fill_dir_t filler, off_t offset,
}
xmlFreeDoc(doc);
}
}
int running_handles;
while (curl_multi_perform(multi_handle.get(), &running_handles) == CURLM_CALL_MULTI_PERFORM)
cout << "[1]" << endl;
while (running_handles) {
fd_set read_fd_set;
fd_set write_fd_set;
fd_set exc_fd_set;
FD_ZERO(&read_fd_set);
FD_ZERO(&write_fd_set);
FD_ZERO(&exc_fd_set);
time_t milliseconds;
VERIFY(curl_multi_timeout(multi_handle.get(), &milliseconds));
if (milliseconds > 0) {
struct timeval timeout;
timeout.tv_sec = 0;
timeout.tv_usec = 1000*milliseconds;
int max_fd;
VERIFY(curl_multi_fdset(multi_handle.get(), &read_fd_set, &write_fd_set, &exc_fd_set, &max_fd));
if (select(max_fd + 1, &read_fd_set, &write_fd_set, &exc_fd_set, &timeout) == -1)
Yikes(-errno);
}
while (curl_multi_perform(multi_handle.get(), &running_handles) == CURLM_CALL_MULTI_PERFORM)
cout << "[2]" << endl;
}
int remaining_msgs = 1;
while (remaining_msgs) {
// this next line pegs cpu for directories w/lotsa files
CURLMsg* msg = curl_multi_info_read(multi_handle.get(), &remaining_msgs);
if (msg != NULL) {
CURLcode code =msg->data.result;
if (code != 0)
cout << "###" << code << curl_easy_strerror(code) << endl;
if (code == 0) {
CURL* curl_handle = msg->easy_handle;
stuff_t stuff = curlMap.get()[curl_handle];
struct stat st;
memset(&st, 0, sizeof(st));
st.st_nlink = 1; // see fuse faq
// mode
st.st_mode = strtoul((*stuff.responseHeaders)["x-amz-meta-mode"].c_str(), (char **)NULL, 10);
char* ContentType = 0;
if (curl_easy_getinfo(curl_handle, CURLINFO_CONTENT_TYPE, &ContentType) == 0) {
if (ContentType)
st.st_mode |= strcmp(ContentType, "application/x-directory") == 0 ? S_IFDIR : S_IFREG;
}
// mtime
st.st_mtime = strtoul((*stuff.responseHeaders)["x-amz-meta-mtime"].c_str(), (char **)NULL, 10);
if (st.st_mtime == 0) {
long LastModified;
if (curl_easy_getinfo(curl_handle, CURLINFO_FILETIME, &LastModified) == 0)
st.st_mtime = LastModified;
}
// size
double ContentLength;
if (curl_easy_getinfo(curl_handle, CURLINFO_CONTENT_LENGTH_DOWNLOAD, &ContentLength) == 0)
st.st_size = static_cast<off_t>(ContentLength);
// blocks
if (S_ISREG(st.st_mode))
st.st_blocks = st.st_size / 512 + 1;
auto_lock lock(stat_cache_lock);
stat_cache[stuff.path] = st;
}
}
}
} // IsTruncated
return 0;
}
@ -1008,6 +1222,7 @@ s3fs_init(struct fuse_conn_info *conn) {
printf("init\n");
pthread_mutex_init(&curl_handles_lock, NULL);
pthread_mutex_init(&s3fs_descriptors_lock, NULL);
pthread_mutex_init(&stat_cache_lock, NULL);
return 0;
}
@ -1016,6 +1231,7 @@ s3fs_destroy(void*) {
printf("destroy\n");
pthread_mutex_destroy(&curl_handles_lock);
pthread_mutex_destroy(&s3fs_descriptors_lock);
pthread_mutex_destroy(&stat_cache_lock);
}
static int