From 3274f58948bd1f2ea135ffeab2771ee9a3d70567 Mon Sep 17 00:00:00 2001 From: "ggtakec@gmail.com" Date: Tue, 23 Jul 2013 16:01:48 +0000 Subject: [PATCH] Changes codes for performance(part 3) * Summay This revision includes big change about temporary file and local cache file. By this big change, s3fs works with good performance when s3fs opens/ closes/syncs/reads object. I made a big change about the handling about temporary file and local cache file to do this implementation. * Detail 1) About temporary file(local file) s3fs uses a temporary file on local file system when s3fs does download/ upload/open/seek object on S3. After this revision, s3fs calls ftruncate() function when s3fs makes the temporary file. In this way s3fs can set a file size of precisely length without downloading. (Notice - ftruncate function is for XSI-compliant systems, so that possibly you have a problem on non-XSI-compliant systems.) By this change, s3fs can download a part of a object by requesting with "Range" http header. It seems like downloading by each block unit. The default block(part) size is 50MB, it is caused the result which is default parallel requests count(5) by default multipart upload size(10MB). If you need to change this block size, you can change by new option "fd_page_size". This option can take from 1MB(1024 * 1024) to any bytes. So that, you have to take care about that fdcache.cpp(and fdcache.h) were changed a lot. 2) About local cache Local cache files which are in directory specified by "use_cache" option do not have always all of object data. This cause is that s3fs uses ftruncate function and reads(writes) each block unit of a temporary file. s3fs manages each block unit's status which are "downloaded area" or "not". For this status, s3fs makes new temporary file in cache directory which is specified by "use_cache" option. This status files is in a directory which is named "/./". When s3fs opens this status file, s3fs locks this file for exclusive control by calling flock function. You need to take care about this, the status files can not be laid on network drive(like NFS). This revision changes about file open mode, s3fs always opens a local cache file and each status file with writable mode. Last, this revision adds new option "del_cache", this option means that s3fs deletes all local cache file when s3fs starts and exits. 3) Uploading When s3fs writes data to file descriptor through FUSE request, old s3fs revision downloads all of the object. But new revision does not download all, it downloads only small percial area(some block units) including writing data area. And when s3fs closes or flushes the file descriptor, s3fs downloads other area which is not downloaded from server. After that, s3fs uploads all of data. Already r456 revision has parallel upload function, then this revision with r456 and r457 are very big change for performance. 4) Downloading By changing a temporary file and a local cache file, when s3fs downloads a object, it downloads only the required range(some block units). And s3fs downloads units by parallel GET request, it is same as a case of uploading. (Maximum parallel request count and each download size are specified same parameters for uploading.) In the new revision, when s3fs opens file, s3fs returns file descriptor soon. Because s3fs only opens(makes) the file descriptor with no downloading data. And when s3fs reads a data, s3fs downloads only some block unit including specified area. This result is good for performance. 5) Changes option name The option "parallel_upload" which added at r456 is changed to new option name as "parallel_count". This reason is this option value is not only used by uploading object, but a uploading object also uses this option. (For a while, you can use old option name "parallel_upload" for compatibility.) git-svn-id: http://s3fs.googlecode.com/svn/trunk@458 df820570-a93a-0410-bd06-b72b767a4274 --- doc/man/s3fs.1 | 10 +- src/common.h | 7 + src/curl.cpp | 192 +++++-- src/curl.h | 14 +- src/fdcache.cpp | 1324 +++++++++++++++++++++++++++++++++++++-------- src/fdcache.h | 178 ++++-- src/s3fs.cpp | 826 +++++++++++----------------- src/s3fs_util.cpp | 113 +++- src/s3fs_util.h | 15 + 9 files changed, 1881 insertions(+), 798 deletions(-) diff --git a/doc/man/s3fs.1 b/doc/man/s3fs.1 index 767bcf7..fc3f7e1 100644 --- a/doc/man/s3fs.1 +++ b/doc/man/s3fs.1 @@ -59,6 +59,9 @@ number of times to retry a failed S3 transaction. \fB\-o\fR use_cache (default="" which means disabled) local folder to use for local file cache. .TP +\fB\-o\fR del_cache - delete local file cache +delete local file cache when s3fs starts and exits. +.TP \fB\-o\fR use_rrs (default is disable) use Amazon's Reduced Redundancy Storage. this option can not be specified with use_sse. @@ -99,12 +102,17 @@ s3fs is always using dns cache, this option make dns cache disable. \fB\-o\fR multireq_max (default="500") maximum number of parallel request for listing objects. .TP -\fB\-o\fR parallel_upload (default="5") +\fB\-o\fR parallel_count (default="5") number of parallel request for uploading big objects. s3fs uploads large object(over 20MB) by multipart post request, and sends parallel requests. This option limits parallel request count which s3fs requests at once. It is necessary to set this value depending on a CPU and a network band. .TP +\fB\-o\fR fd_page_size(default="52428800"(50MB)) +number of internal management page size for each file discriptor. +For delayed reading and writing by s3fs, s3fs manages pages which is separated from object. Each pages has a status that data is already loaded(or not loaded yet). +This option should not be changed when you don't have a trouble with performance. +.TP \fB\-o\fR url (default="http://s3.amazonaws.com") sets the url to use to access Amazon S3. If you want to use HTTPS, then you can set url=https://s3.amazonaws.com .TP diff --git a/src/common.h b/src/common.h index 55b5ee0..1bcffb6 100644 --- a/src/common.h +++ b/src/common.h @@ -23,6 +23,11 @@ printf(__VA_ARGS__); \ } +#define FGPRINT2(...) \ + if(foreground2){ \ + printf(__VA_ARGS__); \ + } + #define SAFESTRPTR(strptr) (strptr ? strptr : "") // @@ -35,6 +40,8 @@ typedef std::map headers_t; // extern bool debug; extern bool foreground; +extern bool foreground2; +extern bool nomultipart; extern std::string program_name; extern std::string service_path; extern std::string host; diff --git a/src/curl.cpp b/src/curl.cpp index 4b7b8fb..97ef690 100644 --- a/src/curl.cpp +++ b/src/curl.cpp @@ -150,7 +150,7 @@ curltime_t S3fsCurl::curl_times; curlprogress_t S3fsCurl::curl_progress; string S3fsCurl::curl_ca_bundle; mimes_t S3fsCurl::mimeTypes; -int S3fsCurl::max_parallel_upload = 5; // default +int S3fsCurl::max_parallel_cnt = 5; // default //------------------------------------------------------------------- // Class methods for S3fsCurl @@ -469,7 +469,7 @@ bool S3fsCurl::LocateBundle(void) return true; } -size_t S3fsCurl::WriteMemoryCallback(void *ptr, size_t blockSize, size_t numBlocks, void *data) +size_t S3fsCurl::WriteMemoryCallback(void* ptr, size_t blockSize, size_t numBlocks, void* data) { BodyData* body = (BodyData*)data; @@ -481,7 +481,7 @@ size_t S3fsCurl::WriteMemoryCallback(void *ptr, size_t blockSize, size_t numBloc return (blockSize * numBlocks); } -size_t S3fsCurl::ReadCallback(void *ptr, size_t size, size_t nmemb, void *userp) +size_t S3fsCurl::ReadCallback(void* ptr, size_t size, size_t nmemb, void* userp) { S3fsCurl* pCurl = reinterpret_cast(userp); @@ -500,7 +500,7 @@ size_t S3fsCurl::ReadCallback(void *ptr, size_t size, size_t nmemb, void *userp) return copysize; } -size_t S3fsCurl::HeaderCallback(void *data, size_t blockSize, size_t numBlocks, void *userPtr) +size_t S3fsCurl::HeaderCallback(void* data, size_t blockSize, size_t numBlocks, void* userPtr) { headers_t* headers = reinterpret_cast(userPtr); string header(reinterpret_cast(data), blockSize * numBlocks); @@ -521,7 +521,7 @@ size_t S3fsCurl::HeaderCallback(void *data, size_t blockSize, size_t numBlocks, return blockSize * numBlocks; } -size_t S3fsCurl::UploadReadCallback(void *ptr, size_t size, size_t nmemb, void *userp) +size_t S3fsCurl::UploadReadCallback(void* ptr, size_t size, size_t nmemb, void* userp) { S3fsCurl* pCurl = reinterpret_cast(userp); @@ -554,6 +554,41 @@ size_t S3fsCurl::UploadReadCallback(void *ptr, size_t size, size_t nmemb, void * return totalread; } +size_t S3fsCurl::DownloadWriteCallback(void* ptr, size_t size, size_t nmemb, void* userp) +{ + S3fsCurl* pCurl = reinterpret_cast(userp); + + if(1 > (size * nmemb)){ + return 0; + } + if(-1 == pCurl->partdata.fd || 0 >= pCurl->partdata.size){ + return 0; + } + + // write size + ssize_t copysize = (size * nmemb) < (size_t)pCurl->partdata.size ? (size * nmemb) : (size_t)pCurl->partdata.size; + ssize_t writebytes; + ssize_t totalwrite; + + // write + for(totalwrite = 0, writebytes = 0; totalwrite < copysize; totalwrite += writebytes){ + writebytes = pwrite(pCurl->partdata.fd, &((char*)ptr)[totalwrite], (copysize - totalwrite), pCurl->partdata.startpos + totalwrite); + if(0 == writebytes){ + // eof? + break; + }else if(-1 == writebytes){ + // error + FGPRINT("S3fsCurl::DownloadWriteCallback: write file error(%d).\n", errno); + SYSLOGERR("write file error(%d).", errno); + return 0; + } + } + pCurl->partdata.startpos += totalwrite; + pCurl->partdata.size -= totalwrite; + + return totalwrite; +} + bool S3fsCurl::SetDnsCache(bool isCache) { bool old = S3fsCurl::is_dns_cache; @@ -637,10 +672,10 @@ long S3fsCurl::SetSslVerifyHostname(long value) return old; } -int S3fsCurl::SetMaxParallelUpload(int value) +int S3fsCurl::SetMaxParallelCount(int value) { - int old = S3fsCurl::max_parallel_upload; - S3fsCurl::max_parallel_upload = value; + int old = S3fsCurl::max_parallel_cnt; + S3fsCurl::max_parallel_cnt = value; return old; } @@ -705,7 +740,6 @@ int S3fsCurl::ParallelMultipartUploadRequest(const char* tpath, headers_t& meta, etaglist_t list; off_t remaining_bytes; unsigned char* buf; - char tmpfile[256]; S3fsCurl s3fscurl; FGPRINT(" S3fsCurl::ParallelMultipartUploadRequest[tpath=%s][fd=%d]\n", SAFESTRPTR(tpath), fd); @@ -752,7 +786,7 @@ int S3fsCurl::ParallelMultipartUploadRequest(const char* tpath, headers_t& meta, curlmulti.SetRetryCallback(S3fsCurl::UploadMultipartPostRetryCallback); // Loop for setup parallel upload(multipart) request. - for(para_cnt = 0; para_cnt < S3fsCurl::max_parallel_upload && 0 < remaining_bytes; para_cnt++, remaining_bytes -= chunk){ + for(para_cnt = 0; para_cnt < S3fsCurl::max_parallel_cnt && 0 < remaining_bytes; para_cnt++, remaining_bytes -= chunk){ // chunk size chunk = remaining_bytes > MULTIPART_SIZE ? MULTIPART_SIZE : remaining_bytes; @@ -775,12 +809,12 @@ int S3fsCurl::ParallelMultipartUploadRequest(const char* tpath, headers_t& meta, // set into parallel object if(!curlmulti.SetS3fsCurlObject(s3fscurl_para)){ - FGPRINT("S3fsCurl::ParallelMultipartUploadRequest: Could not set curl object into multi curl(%s).\n", tmpfile); - SYSLOGERR("Could not make curl object into multi curl(%s).", tmpfile); + FGPRINT("S3fsCurl::ParallelMultipartUploadRequest: Could not set curl object into multi curl(%s).\n", tpath); + SYSLOGERR("Could not make curl object into multi curl(%s).", tpath); free(buf); fclose(file); delete s3fscurl_para; - return result; + return -1; } } @@ -803,6 +837,78 @@ int S3fsCurl::ParallelMultipartUploadRequest(const char* tpath, headers_t& meta, return 0; } +S3fsCurl* S3fsCurl::ParallelGetObjectRetryCallback(S3fsCurl* s3fscurl) +{ + int result; + + if(!s3fscurl){ + return NULL; + } + // duplicate request(setup new curl object) + S3fsCurl* newcurl = new S3fsCurl(); + if(0 != (result = newcurl->PreGetObjectRequest( + s3fscurl->path.c_str(), s3fscurl->partdata.fd, s3fscurl->partdata.startpos, s3fscurl->partdata.size))){ + FGPRINT("S3fsCurl::ParallelGetObjectRetryCallback: failed downloading part setup(%d)\n", result); + SYSLOGERR("failed downloading part setup(%d)", result); + delete newcurl; + return NULL;; + } + return newcurl; +} + +int S3fsCurl::ParallelGetObjectRequest(const char* tpath, int fd, off_t start, ssize_t size) +{ + FGPRINT(" S3fsCurl::ParallelGetObjectRequest[tpath=%s][fd=%d]\n", SAFESTRPTR(tpath), fd); + + int result = 0; + ssize_t remaining_bytes; + + // cycle through open fd, pulling off 10MB chunks at a time + for(remaining_bytes = size; 0 < remaining_bytes; ){ + S3fsMultiCurl curlmulti; + int para_cnt; + off_t chunk; + + // Initialize S3fsMultiCurl + //curlmulti.SetSuccessCallback(NULL); // not need to set success callback + curlmulti.SetRetryCallback(S3fsCurl::ParallelGetObjectRetryCallback); + + // Loop for setup parallel upload(multipart) request. + for(para_cnt = 0; para_cnt < S3fsCurl::max_parallel_cnt && 0 < remaining_bytes; para_cnt++, remaining_bytes -= chunk){ + // chunk size + chunk = remaining_bytes > MULTIPART_SIZE ? MULTIPART_SIZE : remaining_bytes; + + // s3fscurl sub object + S3fsCurl* s3fscurl_para = new S3fsCurl(); + if(0 != (result = s3fscurl_para->PreGetObjectRequest(tpath, fd, (start + size - remaining_bytes), chunk))){ + FGPRINT("S3fsCurl::ParallelGetObjectRequest: failed downloading part setup(%d)\n", result); + SYSLOGERR("failed downloading part setup(%d)", result); + delete s3fscurl_para; + return result; + } + + // set into parallel object + if(!curlmulti.SetS3fsCurlObject(s3fscurl_para)){ + FGPRINT("S3fsCurl::ParallelGetObjectRequest: Could not set curl object into multi curl(%s).\n", tpath); + SYSLOGERR("Could not make curl object into multi curl(%s).", tpath); + delete s3fscurl_para; + return -1; + } + } + + // Multi request + if(0 != (result = curlmulti.Request())){ + FGPRINT("S3fsCurl::ParallelGetObjectRequest: error occuered in multi request(errno=%d).\n", result); + SYSLOGERR("error occuered in multi request(errno=%d).", result); + break; + } + + // reinit for loop. + curlmulti.Clear(); + } + return result; +} + //------------------------------------------------------------------- // Methods for S3fsCurl //------------------------------------------------------------------- @@ -1558,21 +1664,13 @@ int S3fsCurl::PutRequest(const char* tpath, headers_t& meta, int fd, bool ow_sse return result; } -int S3fsCurl::GetObjectRequest(const char* tpath, int fd) +int S3fsCurl::PreGetObjectRequest(const char* tpath, int fd, off_t start, ssize_t size) { - FILE* file; - int fd2; - FGPRINT(" S3fsCurl::GetRequest [tpath=%s]\n", SAFESTRPTR(tpath)); + FGPRINT(" S3fsCurl::PreGetRequest [tpath=%s][start=%zd][size=%zd]\n", SAFESTRPTR(tpath), start, size); - if(!tpath){ + if(!tpath || -1 == fd || 0 > start || 0 >= size){ return -1; } - // duplicate fd - if(-1 == (fd2 = dup(fd)) || 0 != lseek(fd2, 0, SEEK_SET) || NULL == (file = fdopen(fd2, "w+"))){ - FGPRINT("S3fsCurl::GetRequest : Cloud not duplicate file discriptor(errno=%d)\n", errno); - SYSLOGERR("Cloud not duplicate file discriptor(errno=%d)", errno); - return -errno; - } if(!CreateCurlHandle(true)){ return -1; @@ -1589,6 +1687,13 @@ int S3fsCurl::GetObjectRequest(const char* tpath, int fd) string date = get_date(); requestHeaders = curl_slist_sort_insert(requestHeaders, string("Date: " + date).c_str()); requestHeaders = curl_slist_sort_insert(requestHeaders, "Content-Type: "); + if(-1 != start && -1 != size){ + string range = "Range: bytes="; + range += str(start); + range += "-"; + range += str(start + size - 1); + requestHeaders = curl_slist_sort_insert(requestHeaders, range.c_str()); + } if(!S3fsCurl::IsPublicBucket()){ requestHeaders = curl_slist_sort_insert( @@ -1600,20 +1705,37 @@ int S3fsCurl::GetObjectRequest(const char* tpath, int fd) // setopt curl_easy_setopt(hCurl, CURLOPT_URL, url.c_str()); curl_easy_setopt(hCurl, CURLOPT_HTTPHEADER, requestHeaders); - curl_easy_setopt(hCurl, CURLOPT_FILE, file); + curl_easy_setopt(hCurl, CURLOPT_WRITEFUNCTION, S3fsCurl::DownloadWriteCallback); + curl_easy_setopt(hCurl, CURLOPT_WRITEDATA, (void*)this); + + // set info for callback func. + // (use only fd, startpos and size, other member is not used.) + partdata.clear(); + partdata.fd = fd; + partdata.startpos = start; + partdata.size = size; + + return 0; +} + +int S3fsCurl::GetObjectRequest(const char* tpath, int fd, off_t start, ssize_t size) +{ + int result; + + FGPRINT(" S3fsCurl::GetRequest [tpath=%s][start=%zd][size=%zd]\n", SAFESTRPTR(tpath), start, size); + + if(!tpath){ + return -1; + } + if(0 != (result = PreGetObjectRequest(tpath, fd, start, size))){ + return result; + } FGPRINT(" downloading... [path=%s][fd=%d]\n", tpath, fd); SYSLOGDBG("LOCAL FD"); - int result = RequestPerform(); - - fflush(file); - fclose(file); - if(0 != lseek(fd, 0, SEEK_SET)){ - FGPRINT("S3fsCurl::GetRequest : Cloud not seek file discriptor(errno=%d)\n", errno); - SYSLOGERR("Cloud not seek file discriptor(errno=%d)", errno); - return -errno; - } + result = RequestPerform(); + partdata.clear(); return result; } @@ -1811,7 +1933,7 @@ int S3fsCurl::CompleteMultipartPostRequest(const char* tpath, string& upload_id, for(int cnt = 0; cnt < (int)parts.size(); cnt++){ if(0 == parts[cnt].length()){ FGPRINT("S3fsCurl::CompleteMultipartPostRequest : %d file part is not finished uploading.\n", cnt + 1); - return false; + return -1; } postContent += "\n"; postContent += " " + IntToStr(cnt + 1) + "\n"; diff --git a/src/curl.h b/src/curl.h index 9733eda..e304255 100644 --- a/src/curl.h +++ b/src/curl.h @@ -58,7 +58,7 @@ struct filepart clear(); } - void clear(bool isfree = true) + void clear(void) { uploaded = false; etag = ""; @@ -128,7 +128,7 @@ class S3fsCurl static curlprogress_t curl_progress; static std::string curl_ca_bundle; static mimes_t mimeTypes; - static int max_parallel_upload; + static int max_parallel_cnt; // variables CURL* hCurl; @@ -143,7 +143,7 @@ class S3fsCurl long LastResponseCode; const unsigned char* postdata; // use by post method and read callback function. int postdata_remaining; // use by post method and read callback function. - filepart partdata; // use by multipart upload + filepart partdata; // use by multipart upload/get object callback public: // constructor/destructor @@ -162,9 +162,11 @@ class S3fsCurl static size_t WriteMemoryCallback(void *ptr, size_t blockSize, size_t numBlocks, void *data); static size_t ReadCallback(void *ptr, size_t size, size_t nmemb, void *userp); static size_t UploadReadCallback(void *ptr, size_t size, size_t nmemb, void *userp); + static size_t DownloadWriteCallback(void* ptr, size_t size, size_t nmemb, void* userp); static bool UploadMultipartPostCallback(S3fsCurl* s3fscurl); static S3fsCurl* UploadMultipartPostRetryCallback(S3fsCurl* s3fscurl); + static S3fsCurl* ParallelGetObjectRetryCallback(S3fsCurl* s3fscurl); // methods bool ClearInternalData(void); @@ -186,6 +188,7 @@ class S3fsCurl static bool InitShareCurl(void); static bool DestroyShareCurl(void); static int ParallelMultipartUploadRequest(const char* tpath, headers_t& meta, int fd, bool ow_sse_flg); + static int ParallelGetObjectRequest(const char* tpath, int fd, off_t start, ssize_t size); // class methods(valiables) static std::string LookupMimeType(std::string name); @@ -206,7 +209,7 @@ class S3fsCurl static bool IsSetAccessKeyId(void) { return (0 < S3fsCurl::AWSAccessKeyId.size() && 0 < S3fsCurl::AWSSecretAccessKey.size()); } static long SetSslVerifyHostname(long value); static long GetSslVerifyHostname(void) { return S3fsCurl::ssl_verify_hostname; } - static int SetMaxParallelUpload(int value); + static int SetMaxParallelCount(int value); // methods bool CreateCurlHandle(bool force = false); @@ -222,7 +225,8 @@ class S3fsCurl int HeadRequest(const char* tpath, headers_t& meta); int PutHeadRequest(const char* tpath, headers_t& meta, bool ow_sse_flg); int PutRequest(const char* tpath, headers_t& meta, int fd, bool ow_sse_flg); - int GetObjectRequest(const char* tpath, int fd); + int PreGetObjectRequest(const char* tpath, int fd, off_t start, ssize_t size); + int GetObjectRequest(const char* tpath, int fd, off_t start = -1, ssize_t size = -1); int CheckBucket(void); int ListBucketRequest(const char* tpath, const char* query); int MultipartListRequest(std::string& body); diff --git a/src/fdcache.cpp b/src/fdcache.cpp index d0dd277..bb7c789 100644 --- a/src/fdcache.cpp +++ b/src/fdcache.cpp @@ -21,290 +21,1188 @@ #include #include #include +#include +#include #include #include +#include +#include #include #include +#include +#include #include #include #include #include #include +#include +#include "common.h" #include "fdcache.h" #include "s3fs.h" +#include "s3fs_util.h" +#include "curl.h" using namespace std; -//------------------------------------------------------------------- -// Utility for fd_cache_entlist_t -//------------------------------------------------------------------- -static int get_fdlist_entlist(fd_cache_entlist_t* list, fd_list_t& fdlist); -static fd_cache_entlist_t::iterator find_entlist(fd_cache_entlist_t* list, int fd); -static fd_cache_entlist_t::iterator find_writable_fd_entlist(fd_cache_entlist_t* list); -static bool add_fd_entlist(fd_cache_entlist_t* list, int fd, int flags); -static bool erase_fd_entlist(fd_cache_entlist_t* list, int fd, bool force = false); +//------------------------------------------------ +// Symbols +//------------------------------------------------ +#define MAX_OBJECT_SIZE 68719476735LL // 64GB - 1L +#define MULTIPART_LOWLIMIT (20 * 1024 * 1024) // 20MB +#define FDPAGE_SIZE (50 * 1024 * 1024) // 50MB(parallel uploading is 5 parallel(default) * 10 MB) -static int get_fdlist_entlist(fd_cache_entlist_t* list, fd_list_t& fdlist) +//------------------------------------------------ +// CacheFileStat class methods +//------------------------------------------------ +bool CacheFileStat::MakeCacheFileStatPath(const char* path, string& sfile_path, bool is_create_dir) { - fd_cache_entlist_t::iterator iter; - int count = 0; + // make stat dir top path( "//..stat" ) + string top_path = FdManager::GetCacheDir(); + top_path += "/."; + top_path += bucket; + top_path += ".stat"; - for(count = 0, iter = list->begin(); list->end() != iter; iter++, count++){ - fdlist.push_back((*iter).fd); + if(is_create_dir){ + mkdirp(top_path + mydirname(path), 0777); } - return count; + if(!path || '\0' == path[0]){ + sfile_path = top_path; + }else{ + sfile_path = top_path + SAFESTRPTR(path); + } + return true; } -static fd_cache_entlist_t::iterator find_entlist(fd_cache_entlist_t* list, int fd) +//------------------------------------------------ +// CacheFileStat methods +//------------------------------------------------ +CacheFileStat::CacheFileStat(const char* tpath) : path(""), fd(-1) { - fd_cache_entlist_t::iterator iter; + if(tpath && '\0' != tpath[0]){ + SetPath(tpath, true); + } +} - for(iter = list->begin(); list->end() != iter; iter++){ - if(fd == (*iter).fd){ +CacheFileStat::~CacheFileStat() +{ + Release(); +} + +bool CacheFileStat::SetPath(const char* tpath, bool is_open) +{ + if(!tpath || '\0' == tpath[0]){ + return false; + } + if(!Release()){ + // could not close old stat file. + return false; + } + if(tpath){ + path = tpath; + } + if(!is_open){ + return true; + } + return Open(); +} + +bool CacheFileStat::Open(void) +{ + if(0 == path.size()){ + return false; + } + if(-1 != fd){ + // already opened + return true; + } + // stat path + string sfile_path; + if(!CacheFileStat::MakeCacheFileStatPath(path.c_str(), sfile_path, true)){ + FGPRINT("CacheFileStat::Open: failed to create cache stat file path(%s)\n", path.c_str()); + SYSLOGERR("failed to create cache stat file path(%s)", path.c_str()); + return false; + } + // open + if(-1 == (fd = open(sfile_path.c_str(), O_CREAT|O_RDWR, 0600))){ + FGPRINT2("CacheFileStat::Open: failed to open cache stat file(%s) - errno(%d)\n", path.c_str(), errno); + //SYSLOGERR("failed to open cache stat file path(%s) - errno(%d)", path.c_str(), errno); + return false; + } + // lock + if(-1 == flock(fd, LOCK_EX)){ + FGPRINT("CacheFileStat::Open: failed to lock cache stat file(%s) - errno(%d)\n", path.c_str(), errno); + SYSLOGERR("failed to lock cache stat file(%s) - errno(%d)", path.c_str(), errno); + close(fd); + fd = -1; + return false; + } + // seek top + if(0 != lseek(fd, 0, SEEK_SET)){ + FGPRINT("CacheFileStat::Open: failed to lseek cache stat file(%s) - errno(%d)\n", path.c_str(), errno); + SYSLOGERR("failed to lseek cache stat file(%s) - errno(%d)", path.c_str(), errno); + flock(fd, LOCK_UN); + close(fd); + fd = -1; + return false; + } + FGPRINT2(" CacheFileStat::Open: file locked(%s - %s)\n", path.c_str(), sfile_path.c_str()); + + return true; +} + +bool CacheFileStat::Release(void) +{ + if(-1 == fd){ + // already release + return true; + } + // unlock + if(-1 == flock(fd, LOCK_UN)){ + FGPRINT("CacheFileStat::Open: failed to unlock cache stat file(%s) - errno(%d)\n", path.c_str(), errno); + SYSLOGERR("failed to unlock cache stat file(%s) - errno(%d)", path.c_str(), errno); + return false; + } + FGPRINT2(" CacheFileStat::Open: file unlocked(%s)\n", path.c_str()); + + if(-1 == close(fd)){ + FGPRINT("CacheFileStat::Open: failed to close cache stat file(%s) - errno(%d)\n", path.c_str(), errno); + SYSLOGERR("failed to close cache stat file(%s) - errno(%d)", path.c_str(), errno); + return false; + } + fd = -1; + + return true; +} + +//------------------------------------------------ +// PageList methods +//------------------------------------------------ +void PageList::FreeList(fdpage_list_t& list) +{ + for(fdpage_list_t::iterator iter = list.begin(); iter != list.end(); iter = list.erase(iter)){ + delete (*iter); + } + list.clear(); +} + +PageList::PageList(size_t size, bool is_init) +{ + Init(0, false); +} + +PageList::~PageList() +{ + Clear(); +} + +size_t PageList::Size(void) const +{ + if(0 == pages.size()){ + return 0; + } + fdpage_list_t::const_reverse_iterator riter = pages.rbegin(); + return ((*riter)->offset + (*riter)->bytes); +} + +int PageList::Resize(size_t size, bool is_init) +{ + size_t total = Size(); + + if(0 == total){ + Init(size, is_init); + + }else if(total < size){ + size_t remain = size - total; // remaining bytes + fdpage_list_t::reverse_iterator riter = pages.rbegin(); + + if((*riter)->bytes < FdManager::GetPageSize()){ + // resize last area + remain += (*riter)->bytes; // remaining bytes(without last page) + (*riter)->bytes = remain > FdManager::GetPageSize() ? FdManager::GetPageSize() : remain; // reset page size + remain -= (*riter)->bytes; // remaining bytes(after last page) + (*riter)->init = is_init; + } + + // add new area + for(off_t next = (*riter)->next(); 0 < remain; remain -= size, next += size){ + size = remain > FdManager::GetPageSize() ? FdManager::GetPageSize() : remain; + fdpage* page = new fdpage(next, size, is_init); + pages.push_back(page); + } + + }else if(total > size){ + for(fdpage_list_t::reverse_iterator riter = pages.rbegin(); riter != pages.rend(); riter++){ + if(static_cast((*riter)->offset) < size){ + (*riter)->bytes = size - (*riter)->offset; + break; + } + } + } + return true; +} + +void PageList::Clear(void) +{ + PageList::FreeList(pages); +} + +int PageList::Init(size_t size, bool is_init) +{ + Clear(); + for(size_t total = 0; total < size; total += FdManager::GetPageSize()){ + size_t areasize = (total + FdManager::GetPageSize()) < size ? FdManager::GetPageSize() : (size - total); + fdpage* page = new fdpage(total, areasize, is_init); + pages.push_back(page); + } + return pages.size(); +} + +bool PageList::IsInit(off_t start, size_t size) +{ + off_t next = start + size; + + if(0 == pages.size()){ + return false; + } + // check end + fdpage_list_t::reverse_iterator riter = pages.rbegin(); + if((*riter)->next() < next){ + // size is over end of page list. + return false; + } + for(fdpage_list_t::iterator iter = pages.begin(); iter != pages.end(); iter++){ + if(next <= (*iter)->offset){ break; } - } - return iter; -} - -static fd_cache_entlist_t::iterator find_writable_fd_entlist(fd_cache_entlist_t* list) -{ - fd_cache_entlist_t::iterator iter; - fd_cache_entlist_t::iterator titer; - int flags; - - for(flags = -1, iter = list->begin(), titer = list->end(); list->end() != iter; iter++){ - if(flags < ((*iter).flags & O_ACCMODE)){ - flags = (*iter).flags & O_ACCMODE; - titer = iter; + if((start <= (*iter)->offset && (*iter)->offset < next) || // start < iter-start < end + (start <= (*iter)->end() && (*iter)->end() < next) || // start < iter-end < end + ((*iter)->offset <= start && next <= (*iter)->end()) ) // iter-start < start < end < iter-end + { + if(!(*iter)->init){ + return false; + } } } - return titer; + return true; } -static bool add_fd_entlist(fd_cache_entlist_t* list, int fd, int flags) +bool PageList::SetInit(off_t start, size_t size, bool is_init) { - fd_cache_entlist_t::iterator iter = find_entlist(list, fd); + // check size & resize + if(Size() < (start + size)){ + Resize(start + size, false); + } + + off_t next = start + size; + for(fdpage_list_t::iterator iter = pages.begin(); iter != pages.end(); iter++){ + if((*iter)->end() < start){ + // out of area + // iter:start < iter:end < start < end + continue; + }else if(next <= (*iter)->offset){ + // out of area + // start < end < iter:start < iter:end + break; + } + // area of target overlaps with iter area + // iter:start < start < iter:end < end + // iter:start < start < end < iter:end + // start < iter:start < iter:end < end + // start < iter:start < end < iter:end + if((*iter)->init != is_init){ + (*iter)->init = is_init; + } + } + return true; +} + +bool PageList::FindUninitPage(off_t start, off_t& resstart, size_t& ressize) +{ + for(fdpage_list_t::iterator iter = pages.begin(); iter != pages.end(); iter++){ + if(start <= (*iter)->end()){ + if(!(*iter)->init){ + resstart = (*iter)->offset; + ressize = (*iter)->bytes; + return true; + } + } + } + return false; +} + +int PageList::GetUninitPages(fdpage_list_t& uninit_list, off_t start) +{ + for(fdpage_list_t::iterator iter = pages.begin(); iter != pages.end(); iter++){ + if(start <= (*iter)->end()){ + // after start pos + if(!(*iter)->init){ + // found uninitialized area + fdpage_list_t::reverse_iterator riter = uninit_list.rbegin(); + if(riter != uninit_list.rend() && (*riter)->next() == (*iter)->offset){ + // merge to before page + (*riter)->bytes += (*iter)->bytes; + }else{ + fdpage* page = new fdpage((*iter)->offset, (*iter)->bytes, false); + uninit_list.push_back(page); + } + } + } + } + return uninit_list.size(); +} + +bool PageList::Serialize(CacheFileStat& file, bool is_output) +{ + if(!file.Open()){ + return false; + } + if(is_output){ + // + // put to file + // + stringstream ssall; + ssall << Size(); + + for(fdpage_list_t::iterator iter = pages.begin(); iter != pages.end(); iter++){ + ssall << "\n" << (*iter)->offset << ":" << (*iter)->bytes << ":" << ((*iter)->init ? "1" : "0"); + } + + string strall = ssall.str(); + if(0 >= pwrite(file.GetFd(), strall.c_str(), strall.length(), 0)){ + FGPRINT("PageList::Serialize: failed to write stats(%d)\n", errno); + SYSLOGERR("failed to write stats(%d)", errno); + return false; + } - if(list->end() == iter){ - // not found, add new entry. - fd_cache_entry ent; - ent.refcnt = 1; - ent.fd = fd; - ent.flags = flags; - list->push_back(ent); }else{ - // found same fd, need to check flags. - (*iter).refcnt++; - if(flags != (*iter).flags){ - (*iter).flags = flags; + // + // loading from file + // + struct stat st; + memset(&st, 0, sizeof(struct stat)); + if(-1 == fstat(file.GetFd(), &st)){ + FGPRINT("PageList::Serialize: fstat is failed. errno(%d)\n", errno); + SYSLOGERR("fstat is failed. errno(%d)", errno); + return false; + } + if(0 >= st.st_size){ + // nothing + Init(0, false); + return true; + } + char* ptmp; + if(NULL == (ptmp = (char*)calloc(st.st_size + 1, sizeof(char)))){ + FGPRINT("PageList::Serialize: could not allocate memory.\n"); + SYSLOGERR("could not allocate memory."); + S3FS_FUSE_EXIT(); + return false; + } + // read from file + if(0 >= pread(file.GetFd(), ptmp, st.st_size, 0)){ + FGPRINT("PageList::Serialize: failed to read stats(%d)\n", errno); + SYSLOGERR("failed to read stats(%d)", errno); + free(ptmp); + return false; + } + string oneline; + stringstream ssall(ptmp); + free(ptmp); + + // init + Clear(); + + // load(size) + if(!getline(ssall, oneline, '\n')){ + FGPRINT("PageList::Serialize: failed to parse stats.\n"); + SYSLOGERR("failed to parse stats."); + return false; + } + size_t total = static_cast(atoi(oneline.c_str())); + + // load each part + bool is_err = false; + while(getline(ssall, oneline, '\n')){ + string part; + stringstream ssparts(oneline); + // offset + if(!getline(ssparts, part, ':')){ + is_err = true; + break; + } + off_t offset = static_cast(atoi(part.c_str())); + // size + if(!getline(ssparts, part, ':')){ + is_err = true; + break; + } + ssize_t size = static_cast(atoi(part.c_str())); + // init + if(!getline(ssparts, part, ':')){ + is_err = true; + break; + } + bool is_init = (1 == atoi(part.c_str()) ? true : false); + // add new area + SetInit(offset, size, is_init); + } + if(is_err){ + FGPRINT("PageList::Serialize: failed to parse stats.\n"); + SYSLOGERR("failed to parse stats."); + Clear(); + return false; + } + + // check size + if(total != Size()){ + FGPRINT("PageList::Serialize: different size(%zd - %zd).\n", total, Size()); + SYSLOGERR("different size(%zd - %zd).", total, Size()); + Clear(); + return false; } } return true; } -static bool erase_fd_entlist(fd_cache_entlist_t* list, int fd, bool force) +void PageList::Dump(void) { - fd_cache_entlist_t::iterator iter = find_entlist(list, fd); + int cnt = 0; - if(list->end() == iter){ + FGPRINT2(" FdEntity::Dump = {\n"); + for(fdpage_list_t::iterator iter = pages.begin(); iter != pages.end(); iter++, cnt++){ + FGPRINT2(" [%08d] -> {%014zd - %014zd : %s}\n", cnt, (*iter)->offset, (*iter)->bytes, (*iter)->init ? "true" : "false"); + } + FGPRINT2(" }\n"); +} + +//------------------------------------------------ +// FdEntity methods +//------------------------------------------------ +FdEntity::FdEntity(const char* tpath, const char* cpath) + : is_lock_init(false), path(SAFESTRPTR(tpath)), cachepath(SAFESTRPTR(cpath)), fd(-1), file(NULL), is_modify(false) +{ + try{ + pthread_mutex_init(&fdent_lock, NULL); + is_lock_init = true; + }catch(exception& e){ + FGPRINT("FdEntity::FdEntity: failed to init mutex\n"); + SYSLOGERR("failed to init mutex"); + } +} + +FdEntity::~FdEntity() +{ + Clear(); + + if(is_lock_init){ + try{ + pthread_mutex_destroy(&fdent_lock); + }catch(exception& e){ + FGPRINT("FdEntity::~FdEntity: failed to destroy mutex\n"); + SYSLOGERR("failed to destroy mutex"); + } + is_lock_init = false; + } +} + +void FdEntity::Clear(void) +{ + AutoLock auto_lock(&fdent_lock); + + if(file){ + { + CacheFileStat cfstat(path.c_str()); + if(!pagelist.Serialize(cfstat, true)){ + FGPRINT("FdEntity::Clear: failed to save cache stat file(%s).\n", path.c_str()); + SYSLOGERR("failed to save cache stat file(%s).", path.c_str()); + } + } + fclose(file); + file = NULL; + fd = -1; + } + pagelist.Init(0, false); + refcnt = 0; + path = ""; + cachepath = ""; + is_modify = false; +} + +void FdEntity::Close(void) +{ + FGPRINT2(" FdEntity::Close[path=%s][fd=%d][refcnt=%d]\n", path.c_str(), fd, (-1 != fd ? refcnt - 1 : refcnt)); + + if(-1 != fd){ + AutoLock auto_lock(&fdent_lock); + + if(0 < refcnt){ + refcnt--; + } + if(0 == refcnt){ + { + CacheFileStat cfstat(path.c_str()); + if(!pagelist.Serialize(cfstat, true)){ + FGPRINT("FdEntity::Close: failed to save cache stat file(%s).\n", path.c_str()); + SYSLOGERR("failed to save cache stat file(%s).", path.c_str()); + } + } + fclose(file); + file = NULL; + fd = -1; + } + } +} + +int FdEntity::Dup(void) +{ + FGPRINT2(" FdEntity::Dup[path=%s][fd=%d][refcnt=%d]\n", path.c_str(), fd, (-1 != fd ? refcnt + 1 : refcnt)); + + if(-1 != fd){ + AutoLock auto_lock(&fdent_lock); + refcnt++; + } + return fd; +} + +int FdEntity::Open(ssize_t size, time_t time) +{ + bool already_opened = false; // already opened fd + bool is_csf_loaded = false; // loaded by cache stat file + bool is_truncate = false; // need to truncate + bool init_value = false; // value for pagelist + + FGPRINT2(" FdEntity::Open[path=%s][fd=%d][size=%zd][time=%zd]\n", path.c_str(), fd, size, time); + + if(-1 != fd){ + // already opened, needs to increment refcnt. + already_opened = true; + + }else{ + // open + if(0 != cachepath.size()){ + // At first, open & flock stat file. + { + CacheFileStat cfstat(path.c_str()); + is_csf_loaded = pagelist.Serialize(cfstat, false); + } + + // open cache file + if(is_csf_loaded && -1 != (fd = open(cachepath.c_str(), O_RDWR))){ + // file exists + struct stat st; + memset(&st, 0, sizeof(struct stat)); + if(-1 == fstat(fd, &st)){ + FGPRINT("FdEntity::Open: fstat is failed. errno(%d)\n", errno); + SYSLOGERR("fstat is failed. errno(%d)", errno); + fclose(file); + file = NULL; + fd = -1; + return (0 == errno ? -EIO : -errno); + } + if(static_cast(st.st_size) != pagelist.Size()){ + is_csf_loaded = false; // reinitializing + if(-1 == size){ + size = st.st_size; + }else{ + is_truncate = true; + } + }else{ + // size OK! --> no initialize after this line. + } + + }else{ + // file does not exist -> create & open + if(-1 == (fd = open(cachepath.c_str(), O_CREAT|O_RDWR|O_TRUNC, 0600))){ + FGPRINT("FdEntity::Open: failed to open file(%s). errno(%d)\n", cachepath.c_str(), errno); + SYSLOGERR("failed to open file(%s). errno(%d)", cachepath.c_str(), errno); + return (0 == errno ? -EIO : -errno); + } + if(-1 == size){ + size = 0; + }else{ + is_truncate = true; + } + is_csf_loaded = false; + } + // make file pointer(for being same tmpfile) + if(NULL == (file = fdopen(fd, "wb"))){ + FGPRINT("FdEntity::Open: failed to get fileno(%s). errno(%d)\n", cachepath.c_str(), errno); + SYSLOGERR("failed to get fileno(%s). errno(%d)", cachepath.c_str(), errno); + close(fd); + fd = -1; + return (0 == errno ? -EIO : -errno); + } + + }else{ + // open temporary file + if(NULL == (file = tmpfile()) || -1 ==(fd = fileno(file))){ + FGPRINT("FdEntity::Open: failed to open tmp file. err(%d)\n", errno); + SYSLOGERR("failed to open tmp file. err(%d)", errno); + if(file){ + fclose(file); + file = NULL; + } + return (0 == errno ? -EIO : -errno); + } + if(-1 == size){ + size = 0; + }else{ + is_truncate = true; + } + } + } + + // truncate + if(is_truncate){ + if(0 != ftruncate(fd, size) || 0 != fsync(fd)){ + FGPRINT("FdEntity::Open: ftruncate(%s) or fsync returned err(%d)\n", cachepath.c_str(), errno); + SYSLOGERR("ftruncate(%s) or fsync returned err(%d)", cachepath.c_str(), errno); + fclose(file); + file = NULL; + fd = -1; + return (0 == errno ? -EIO : -errno); + } + } + + // set mtime + if(-1 != time){ + if(0 != SetMtime(time)){ + FGPRINT("FdEntity::Open: failed to set mtime. errno(%d)\n", errno); + SYSLOGERR("failed to set mtime. errno(%d)", errno); + fclose(file); + file = NULL; + fd = -1; + return (0 == errno ? -EIO : -errno); + } + } + + // set internal data + if(already_opened){ + Dup(); + }else{ + if(!is_csf_loaded){ + pagelist.Init(size, init_value); + } + refcnt = 1; + is_modify = false; + } + return 0; +} + +int FdEntity::SetMtime(time_t time) +{ + FGPRINT2(" FdEntity::SetMtime[path=%s][fd=%d][time=%zd]\n", path.c_str(), fd, time); + + if(-1 == time){ + return 0; + } + if(-1 != fd){ + AutoLock auto_lock(&fdent_lock); + + struct timeval tv[2]; + tv[0].tv_sec = time; + tv[0].tv_usec= 0L; + tv[1].tv_sec = tv[0].tv_sec; + tv[1].tv_usec= 0L; + if(-1 == futimes(fd, tv)){ + FGPRINT("FdEntity::Set: futimes failed. errno(%d)\n", errno); + SYSLOGERR("futimes failed. errno(%d)", errno); + return -errno; + } + }else if(0 < cachepath.size()){ + // not opened file yet. + struct utimbuf n_mtime; + n_mtime.modtime = time; + n_mtime.actime = time; + if(-1 == utime(cachepath.c_str(), &n_mtime)){ + //FGPRINT("FdEntity::Set: utime failed. errno(%d)\n", errno); + //SYSLOGERR("utime failed. errno(%d)", errno); + return -errno; + } + } + return 0; +} + +bool FdEntity::GetSize(size_t& size) +{ + if(-1 == fd){ return false; } - (*iter).refcnt--; - if(!force && 0 < (*iter).refcnt){ - return false; - } - list->erase(iter); + AutoLock auto_lock(&fdent_lock); + + size = pagelist.Size(); return true; } -//------------------------------------------------------------------- -// Static -//------------------------------------------------------------------- -FdCache FdCache::singleton; -pthread_mutex_t FdCache::fd_cache_lock; - -//------------------------------------------------------------------- -// Constructor/Destructor -//------------------------------------------------------------------- -FdCache::FdCache() +bool FdEntity::GetMtime(time_t& time) { - if(this == FdCache::getFdCacheData()){ - pthread_mutex_init(&(FdCache::fd_cache_lock), NULL); + struct stat st; + + if(!GetStats(st)){ + return false; + } + time = st.st_mtime; + return true; +} + +bool FdEntity::GetStats(struct stat& st) +{ + if(-1 == fd){ + return false; + } + AutoLock auto_lock(&fdent_lock); + + memset(&st, 0, sizeof(struct stat)); + if(-1 == fstat(fd, &st)){ + FGPRINT("FdEntity::GetStats: fstat failed. errno(%d)\n", errno); + SYSLOGERR("fstat failed. errno(%d)", errno); + return false; + } + return true; +} + +bool FdEntity::SetAllStatus(bool is_enable) +{ + FGPRINT2(" FdEntity::SetAllStatus[path=%s][fd=%d][%s]\n", path.c_str(), fd, is_enable ? "enable" : "disable"); + + if(-1 == fd){ + return false; + } + AutoLock auto_lock(&fdent_lock); + + // get file size + struct stat st; + memset(&st, 0, sizeof(struct stat)); + if(-1 == fstat(fd, &st)){ + FGPRINT("FdEntity::SetAllEnable: fstat is failed. errno(%d)\n", errno); + SYSLOGERR("fstat is failed. errno(%d)", errno); + return false; + } + // Reinit + pagelist.Init(st.st_size, is_enable); + + return true; +} + +int FdEntity::Load(off_t start, ssize_t size) +{ + int result = 0; + + FGPRINT2(" FdEntity::Load[path=%s][fd=%d][offset=%zd][size=%zd]\n", path.c_str(), fd, start, size); + + if(-1 == fd){ + return -EBADF; + } + AutoLock auto_lock(&fdent_lock); + + // check loaded area & load + fdpage_list_t uninit_list; + if(0 < pagelist.GetUninitPages(uninit_list, start)){ + for(fdpage_list_t::iterator iter = uninit_list.begin(); iter != uninit_list.end(); iter++){ + if(-1 != size && (start + size) <= (*iter)->offset){ + break; + } + // download + if((*iter)->bytes >= MULTIPART_LOWLIMIT && !nomultipart){ // 20MB + // parallel request + // Additional time is needed for large files + time_t backup = 0; + if(120 > S3fsCurl::GetReadwriteTimeout()){ + backup = S3fsCurl::SetReadwriteTimeout(120); + } + result = S3fsCurl::ParallelGetObjectRequest(path.c_str(), fd, (*iter)->offset, (*iter)->bytes); + if(0 != backup){ + S3fsCurl::SetReadwriteTimeout(backup); + } + }else{ + // single request + S3fsCurl s3fscurl; + result = s3fscurl.GetObjectRequest(path.c_str(), fd, (*iter)->offset, (*iter)->bytes); + } + if(0 != result){ + break; + } + + // Set init flag + pagelist.SetInit((*iter)->offset, (*iter)->bytes, true); + } + PageList::FreeList(uninit_list); + } + return result; +} + +bool FdEntity::LoadFull(size_t* size, bool force_load) +{ + int result; + + FGPRINT2(" FdEntity::LoadFull[path=%s][fd=%d]\n", path.c_str(), fd); + + if(-1 == fd){ + if(0 != Open()){ + return false; + } + } + if(force_load){ + SetAllDisable(); + } + // + // TODO: possibly do background for delay loading + // + if(0 != (result = Load(0, pagelist.Size()))){ + FGPRINT("FdEntity::FullDownload: could not download, result(%d)\n", result); + SYSLOGERR("could not download, result(%d)", result); + return false; + } + if(is_modify){ + AutoLock auto_lock(&fdent_lock); + is_modify = false; + } + if(size){ + *size = pagelist.Size(); + } + return true; +} + +int FdEntity::RowFlush(const char* tpath, headers_t& meta, bool ow_sse_flg, bool force_sync) +{ + int result; + + FGPRINT2(" FdEntity::RowFlush[tpath=%s][path=%s][fd=%d]\n", SAFESTRPTR(tpath), path.c_str(), fd); + + if(-1 == fd){ + return -EBADF; + } + AutoLock auto_lock(&fdent_lock); + + if(!force_sync && !is_modify){ + // nothing to update. + return 0; + } + + /* + * Make decision to do multi upload (or not) based upon file size + * + * According to the AWS spec: + * - 1 to 10,000 parts are allowed + * - minimum size of parts is 5MB (expect for the last part) + * + * For our application, we will define part size to be 10MB (10 * 2^20 Bytes) + * maximum file size will be ~64 GB - 2 ** 36 + * + * Initially uploads will be done serially + * + * If file is > 20MB, then multipart will kick in + */ + if(pagelist.Size() > MAX_OBJECT_SIZE){ // 64GB - 1 + // close f ? + return -ENOTSUP; + } + + // seek to head of file. + if(0 != lseek(fd, 0, SEEK_SET)){ + FGPRINT("FdEntity::RowFlush: lseek error(%d)\n", errno); + SYSLOGERR("lseek error(%d)\n", errno); + return -errno; + } + + if(pagelist.Size() >= MULTIPART_LOWLIMIT && !nomultipart){ // 20MB + // Additional time is needed for large files + time_t backup = 0; + if(120 > S3fsCurl::GetReadwriteTimeout()){ + backup = S3fsCurl::SetReadwriteTimeout(120); + } + result = S3fsCurl::ParallelMultipartUploadRequest(tpath ? tpath : path.c_str(), meta, fd, ow_sse_flg); + if(0 != backup){ + S3fsCurl::SetReadwriteTimeout(backup); + } + }else{ + S3fsCurl s3fscurl; + result = s3fscurl.PutRequest(tpath ? tpath : path.c_str(), meta, fd, ow_sse_flg); + } + + // seek to head of file. + if(0 == result && 0 != lseek(fd, 0, SEEK_SET)){ + FGPRINT("FdEntity::RowFlush: lseek error(%d)\n", errno); + SYSLOGERR("lseek error(%d)\n", errno); + return -errno; + } + + if(0 == result){ + is_modify = false; + } + return result; +} + +ssize_t FdEntity::Read(char* bytes, off_t start, size_t size, bool force_load) +{ + int result; + ssize_t rsize; + + FGPRINT2(" FdEntity::Read[path=%s][fd=%d][offset=%zd][size=%zd]\n", path.c_str(), fd, start, size); + + if(-1 == fd){ + return -EBADF; + } + if(force_load){ + AutoLock auto_lock(&fdent_lock); + pagelist.SetInit(start, size, false); + } + // Loading + if(0 != (result = Load(start, size))){ + FGPRINT("FdEntity::Read: could not download. start(%zd), size(%zd), errno(%d)\n", start, size, result); + SYSLOGERR("could not download. start(%zd), size(%zd), errno(%d)", start, size, result); + return -EIO; + } + // Reading + { + AutoLock auto_lock(&fdent_lock); + + if(-1 == (rsize = pread(fd, bytes, size, start))){ + FGPRINT("FdEntity::Read: pread failed. errno(%d)\n", errno); + SYSLOGERR("pread failed. errno(%d)", errno); + return -errno; + } + } + return rsize; +} + +ssize_t FdEntity::Write(const char* bytes, off_t start, size_t size) +{ + int result; + ssize_t wsize; + + FGPRINT2(" FdEntity::Write[path=%s][fd=%d][offset=%zd][size=%zd]\n", path.c_str(), fd, start, size); + + if(-1 == fd){ + return -EBADF; + } + + // Load unitialized area which starts from 0 to (start + size) before writing. + if(0 != (result = Load(0, start))){ + FGPRINT("FdEntity::Write: failed to load uninitialized area before writing(errno=%d)\n", result); + SYSLOGERR("failed to load uninitialized area before writing(errno=%d)", result); + return static_cast(result); + } + + // Writing + { + AutoLock auto_lock(&fdent_lock); + + if(-1 == (wsize = pwrite(fd, bytes, size, start))){ + FGPRINT("FdEntity::Write: pwrite failed. errno(%d)\n", errno); + SYSLOGERR("pwrite failed. errno(%d)", errno); + return -errno; + } + if(!is_modify){ + is_modify = true; + } + if(0 < wsize){ + pagelist.SetInit(start, wsize, true); + } + } + return wsize; +} + +//------------------------------------------------ +// FdManager class valiable +//------------------------------------------------ +FdManager FdManager::singleton; +pthread_mutex_t FdManager::fd_manager_lock; +bool FdManager::is_lock_init(false); +string FdManager::cache_dir(""); +size_t FdManager::page_size(FDPAGE_SIZE); + +//------------------------------------------------ +// FdManager class methods +//------------------------------------------------ +bool FdManager::SetCacheDir(const char* dir) +{ + if(!dir || '\0' == dir[0]){ + cache_dir = ""; + }else{ + cache_dir = dir; + } + return true; +} + +size_t FdManager::SetPageSize(size_t size) +{ + // If already has entries, this function is failed. + if(0 < FdManager::get()->fent.size()){ + return -1; + } + size_t old = FdManager::page_size; + FdManager::page_size = size; + return old; +} + +bool FdManager::DeleteCacheDirectory(void) +{ + if(0 == FdManager::cache_dir.size()){ + return true; + } + string cache_dir; + if(!FdManager::MakeCachePath(NULL, cache_dir, false)){ + return false; + } + return delete_files_in_dir(cache_dir.c_str(), true); +} + +int FdManager::DeleteCacheFile(const char* path) +{ + FGPRINT2(" FdManager::DeleteCacheFile[path=%s]\n", SAFESTRPTR(path)); + + if(!path){ + return -EIO; + } + if(0 == FdManager::cache_dir.size()){ + return 0; + } + string cache_path = ""; + if(!FdManager::MakeCachePath(path, cache_path, false)){ + return 0; + } + if(0 != unlink(cache_path.c_str())){ + //FGPRINT("FdManager::DeleteCacheFile: failed to delete file(%s): errno=%d\n", path, errno); + //SYSLOGERR("failed to delete file(%s): errno=%d", path, errno); + return -errno; + } + return 0; +} + +bool FdManager::MakeCachePath(const char* path, string& cache_path, bool is_create_dir) +{ + if(0 == FdManager::cache_dir.size()){ + cache_path = ""; + return true; + } + string resolved_path(FdManager::cache_dir + "/" + bucket); + if(is_create_dir){ + mkdirp(resolved_path + mydirname(path), 0777); + } + if(!path || '\0' == path[0]){ + cache_path = resolved_path; + }else{ + cache_path = resolved_path + SAFESTRPTR(path); + } + return true; +} + +//------------------------------------------------ +// FdManager methods +//------------------------------------------------ +FdManager::FdManager() +{ + if(this == FdManager::get()){ + try{ + pthread_mutex_init(&FdManager::fd_manager_lock, NULL); + FdManager::is_lock_init = true; + }catch(exception& e){ + FdManager::is_lock_init = false; + FGPRINT("FdManager::FdManager: failed to init mutex\n"); + SYSLOGERR("failed to init mutex"); + } }else{ assert(false); } } -FdCache::~FdCache() +FdManager::~FdManager() { - if(this == FdCache::getFdCacheData()){ - pthread_mutex_destroy(&(FdCache::fd_cache_lock)); - - for(fd_cache_t::iterator iter = fd_cache.begin(); fd_cache.end() != iter; iter++){ - fd_cache_entlist_t* entlist = (*iter).second; - delete entlist; + if(this == FdManager::get()){ + for(fdent_map_t::iterator iter = fent.begin(); fent.end() != iter; iter++){ + FdEntity* ent = (*iter).second; + delete ent; + } + fent.clear(); + + if(FdManager::is_lock_init){ + try{ + pthread_mutex_destroy(&FdManager::fd_manager_lock); + }catch(exception& e){ + FGPRINT("FdManager::FdManager: failed to init mutex\n"); + SYSLOGERR("failed to init mutex"); + } + FdManager::is_lock_init = false; } - fd_cache.clear(); }else{ assert(false); } } -//------------------------------------------------------------------- -// Methods -//------------------------------------------------------------------- -bool FdCache::Add(const char* path, int fd, int flags) +FdEntity* FdManager::GetFdEntity(const char* path) { - fd_cache_t::iterator iter; - string strkey = path; + FGPRINT2(" FdManager::GetFdEntity[path=%s]\n", SAFESTRPTR(path)); - FGPRINT(" FdCache::Add[path=%s] fd(%d),flags(%d)\n", path, fd, flags); + if(!path || '\0' == path[0]){ + return NULL; + } + AutoLock auto_lock(&FdManager::fd_manager_lock); - pthread_mutex_lock(&FdCache::fd_cache_lock); + fdent_map_t::iterator iter = fent.find(string(path)); + if(fent.end() == iter){ + return NULL; + } + return (*iter).second; +} + +FdEntity* FdManager::Open(const char* path, ssize_t size, time_t time, bool force_tmpfile, bool is_create) +{ + FdEntity* ent; + + FGPRINT2(" FdManager::Open[path=%s][size=%zd][time=%zd]\n", SAFESTRPTR(path), size, time); + + if(!path || '\0' == path[0]){ + return NULL; + } + + AutoLock auto_lock(&FdManager::fd_manager_lock); + + fdent_map_t::iterator iter = fent.find(string(path)); + if(fent.end() != iter){ + // found + ent = (*iter).second; + + }else if(is_create){ + // not found + string cache_path = ""; + if(!force_tmpfile && !FdManager::MakeCachePath(path, cache_path, true)){ + FGPRINT("FdManager::GetFd: failed to make cache path for object(%s).\n", path); + SYSLOGERR("failed to make cache path for object(%s).", path); + return NULL; + } + // make new obj + ent = new FdEntity(path, cache_path.c_str()); + fent[string(path)] = ent; - // Add path->fd - fd_cache_entlist_t* entlist; - if(fd_cache.end() != (iter = fd_cache.find(strkey))){ - // found same key. set into fd(or over write) - entlist = (*iter).second; }else{ - // not found, set into new entry. - entlist = new fd_cache_entlist_t(); - fd_cache[strkey] = entlist; + return NULL; } - add_fd_entlist(entlist, fd, flags); - // Set fd->flags - fd_flags[fd] = flags; - - pthread_mutex_unlock(&FdCache::fd_cache_lock); - - return true; + // open + if(-1 == ent->Open(size, time)){ + return NULL; + } + return ent; } -// Specified path(+pid) and fd are removed from fd_cache and fd_flags. -bool FdCache::Del(const char* path, int fd) +bool FdManager::Close(FdEntity* ent) { - fd_cache_t::iterator fd_iter; - fd_flags_t::iterator flags_iter; - string strkey = path; + FGPRINT2(" FdManager::Close[ent->file=%s][ent->fd=%d]\n", ent ? ent->GetPath() : "", ent ? ent->GetFd() : -1); - FGPRINT(" FdCache::Del[path=%s][fd=%d]\n", path, fd); + AutoLock auto_lock(&FdManager::fd_manager_lock); - pthread_mutex_lock(&FdCache::fd_cache_lock); - - // Delete path->fd - if(fd_cache.end() != (fd_iter = fd_cache.find(strkey))){ - fd_cache_entlist_t* entlist = (*fd_iter).second; - erase_fd_entlist(entlist, fd); - if(0 == entlist->size()){ - delete entlist; - fd_cache.erase(fd_iter); - } - } - - // Delete fd->flags - if(fd_flags.end() != (flags_iter = fd_flags.find(fd))){ - fd_flags.erase(flags_iter); - } - - pthread_mutex_unlock(&FdCache::fd_cache_lock); - - return true; -} - -// Specified path(+pid) is removed from fd_cache. -// And if can, fd_cache[path].fd is removed from fd_flags. -bool FdCache::Del(const char* path) -{ - fd_cache_t::iterator fd_iter; - string strkey = path; - - FGPRINT(" FdCache::Del[path=%s]\n", path); - - pthread_mutex_lock(&FdCache::fd_cache_lock); - - if(fd_cache.end() != (fd_iter = fd_cache.find(strkey))){ - fd_cache_entlist_t* entlist = (*fd_iter).second; - fd_list_t fdlist; - if(0 != get_fdlist_entlist(entlist, fdlist)){ - // remove fd->flags map - for(fd_list_t::iterator fdlist_iter; fdlist.end() != fdlist_iter; fdlist_iter++){ - Del(*fdlist_iter); + for(fdent_map_t::iterator iter = fent.begin(); iter != fent.end(); iter++){ + if((*iter).second == ent){ + ent->Close(); + if(!ent->IsOpen()){ + delete (*iter).second; + fent.erase(iter); + return true; } } - // remove path->fd_entlist - delete entlist; - fd_cache.erase(fd_iter); } - pthread_mutex_unlock(&FdCache::fd_cache_lock); - - return true; -} - -// Only fd is removed from fd_flags. -bool FdCache::Del(int fd) -{ - fd_flags_t::iterator flags_iter; - - FGPRINT(" FdCache::Del[fd=%d]\n", fd); - - pthread_mutex_lock(&FdCache::fd_cache_lock); - // Delete fd->flags - if(fd_flags.end() != (flags_iter = fd_flags.find(fd))){ - fd_flags.erase(flags_iter); - } - pthread_mutex_unlock(&FdCache::fd_cache_lock); - - return true; -} - -bool FdCache::Get(const char* path, int* pfd, int* pflags) const -{ - fd_cache_t::const_iterator iter; - bool result = false; - string strkey = path; - - pthread_mutex_lock(&FdCache::fd_cache_lock); - - if(fd_cache.end() != (iter = fd_cache.find(strkey))){ - fd_cache_entlist_t* entlist = (*iter).second; - fd_cache_entlist_t::iterator titer = find_writable_fd_entlist(entlist); - if(titer != entlist->end()){ - // returns writable fd. - result = true; - if(pfd){ - *pfd = (*titer).fd; - } - if(pflags){ - *pflags = (*titer).flags; - } - FGPRINT(" FdCache::Get[path=%s] fd=%d,flags=%d\n", path, (*titer).fd, (*titer).flags); - } - } - pthread_mutex_unlock(&FdCache::fd_cache_lock); - - return result; -} - -bool FdCache::Get(int fd, int* pflags) const -{ - bool result = true; - fd_flags_t::const_iterator iter; - - pthread_mutex_lock(&FdCache::fd_cache_lock); - if(fd_flags.end() != (iter = fd_flags.find(fd))){ - if(pflags){ - *pflags = (*iter).second; - } - FGPRINT(" FdCache::Get[fd=%d] flags=%d\n", fd, (*iter).second); - }else{ - result = false; - } - pthread_mutex_unlock(&FdCache::fd_cache_lock); - - return result; + return false; } diff --git a/src/fdcache.h b/src/fdcache.h index 9ce8f81..4c6b7a7 100644 --- a/src/fdcache.h +++ b/src/fdcache.h @@ -1,50 +1,154 @@ #ifndef FD_CACHE_H_ #define FD_CACHE_H_ -#include "common.h" - -// -// Struct for fuse file handle cache -// -struct fd_cache_entry { - int refcnt; - int fd; - int flags; - - fd_cache_entry() : refcnt(0), fd(0), flags(0) {} -}; - -typedef std::list fd_list_t; -typedef std::list fd_cache_entlist_t; -typedef std::map fd_cache_t; // key=path, value=* -typedef std::map fd_flags_t; // key=file discriptor, value=flags - -// -// Class for fuse file handle cache -// -class FdCache +//------------------------------------------------ +// CacheFileStat +//------------------------------------------------ +class CacheFileStat { private: - static FdCache singleton; - static pthread_mutex_t fd_cache_lock; - fd_cache_t fd_cache; - fd_flags_t fd_flags; + std::string path; + int fd; + + private: + static bool MakeCacheFileStatPath(const char* path, std::string& sfile_path, bool is_create_dir = true); public: - FdCache(); - ~FdCache(); + CacheFileStat(const char* tpath = NULL); + ~CacheFileStat(); + + bool Open(void); + bool Release(void); + bool SetPath(const char* tpath, bool is_open = true); + int GetFd(void) const { return fd; } +}; + +//------------------------------------------------ +// fdpage & PageList +//------------------------------------------------ +// page block information +struct fdpage +{ + off_t offset; + size_t bytes; + bool init; + + fdpage(off_t start = 0, size_t size = 0, bool is_init = false) + : offset(start), bytes(size), init(is_init) {} + + off_t next(void) const { return (offset + bytes); } + off_t end(void) const { return (0 < bytes ? offset + bytes - 1 : 0); } +}; +typedef std::list fdpage_list_t; + +// +// Management of loading area/modifying +// +class PageList +{ + private: + fdpage_list_t pages; + + private: + void Clear(void); + + public: + static void FreeList(fdpage_list_t& list); + + PageList(size_t size = 0, bool is_init = false); + ~PageList(); + + size_t Size(void) const; + int Resize(size_t size, bool is_init); + int Init(size_t size, bool is_init); + bool IsInit(off_t start, size_t size); + bool SetInit(off_t start, size_t size, bool is_init = true); + bool FindUninitPage(off_t start, off_t& resstart, size_t& ressize); + int GetUninitPages(fdpage_list_t& uninit_list, off_t start = 0); + bool Serialize(CacheFileStat& file, bool is_output); + void Dump(void); +}; + +//------------------------------------------------ +// class FdEntity +//------------------------------------------------ +class FdEntity +{ + private: + pthread_mutex_t fdent_lock; + bool is_lock_init; + PageList pagelist; + int refcnt; // reference count + std::string path; // object path + std::string cachepath; // local cache file path + int fd; // file discriptor(tmp file or cache file) + FILE* file; // file pointer(tmp file or cache file) + bool is_modify; // if file is changed, this flag is true + + private: + void Clear(void); + int Dup(void); + bool SetAllStatus(bool is_enable); + + public: + FdEntity(const char* tpath = NULL, const char* cpath = NULL); + ~FdEntity(); + + void Close(void); + bool IsOpen(void) const { return (-1 != fd); } + int Open(ssize_t size = -1, time_t time = -1); + const char* GetPath(void) const { return path.c_str(); } + int GetFd(void) const { return fd; } + int SetMtime(time_t time); + bool GetSize(size_t& size); + bool GetMtime(time_t& time); + bool GetStats(struct stat& st); + + bool SetAllEnable(void) { return SetAllStatus(true); } + bool SetAllDisable(void) { return SetAllStatus(false); } + bool LoadFull(size_t* size = NULL, bool force_load = false); + int Load(off_t start, ssize_t size); + int RowFlush(const char* tpath, headers_t& meta, bool ow_sse_flg, bool force_sync = false); + int Flush(headers_t& meta, bool ow_sse_flg, bool force_sync = false) { return RowFlush(NULL, meta, ow_sse_flg, force_sync); } + ssize_t Read(char* bytes, off_t start, size_t size, bool force_load = false); + ssize_t Write(const char* bytes, off_t start, size_t size); +}; +typedef std::map fdent_map_t; // key=path, value=FdEntity* + +//------------------------------------------------ +// class FdManager +//------------------------------------------------ +class FdManager +{ + private: + static FdManager singleton; + static pthread_mutex_t fd_manager_lock; + static bool is_lock_init; + static std::string cache_dir; + static size_t page_size; + + fdent_map_t fent; + + public: + FdManager(); + ~FdManager(); // Reference singleton - static FdCache* getFdCacheData(void) { - return &singleton; - } + static FdManager* get(void) { return &singleton; } - bool Add(const char* path, int fd, int flags); - bool Del(const char* path, int fd); - bool Del(const char* path); - bool Del(int fd); - bool Get(const char* path, int* pfd = NULL, int* pflags = NULL) const; - bool Get(int fd, int* pflags = NULL) const; + static bool DeleteCacheDirectory(void); + static int DeleteCacheFile(const char* path); + static bool SetCacheDir(const char* dir); + static bool IsCacheDir(void) { return (0 < FdManager::cache_dir.size()); } + static const char* GetCacheDir(void) { return FdManager::cache_dir.c_str(); } + static size_t SetPageSize(size_t size); + static size_t GetPageSize(void) { return FdManager::page_size; } + static bool MakeCachePath(const char* path, std::string& cache_path, bool is_create_dir = true); + + FdEntity* GetFdEntity(const char* path); + FdEntity* Open(const char* path, ssize_t size = -1, time_t time = -1, bool force_tmpfile = false, bool is_create = true); + FdEntity* ExistOpen(const char* path) { return Open(path, -1, -1, false, false); } + bool Close(FdEntity* ent); }; #endif // FD_CACHE_H_ diff --git a/src/s3fs.cpp b/src/s3fs.cpp index 2f1b5a4..aafd78a 100644 --- a/src/s3fs.cpp +++ b/src/s3fs.cpp @@ -63,14 +63,13 @@ using namespace std; #define IS_REPLACEDIR(type) (DIRTYPE_OLD == type || DIRTYPE_FOLDER == type || DIRTYPE_NOOBJ == type) #define IS_RMTYPEDIR(type) (DIRTYPE_OLD == type || DIRTYPE_FOLDER == type) -#define MAX_OBJECT_SIZE 68719476735LL // 64GB - 1L -#define MULTIPART_LOWLIMIT (20 * 1024 * 1024) // 20MB - //------------------------------------------------------------------- // Global valiables //------------------------------------------------------------------- -bool debug = 0; -bool foreground = 0; +bool debug = false; +bool foreground = false; +bool foreground2 = false; +bool nomultipart = false; std::string program_name; std::string service_path = "/"; std::string host = "http://s3.amazonaws.com"; @@ -83,7 +82,6 @@ static mode_t root_mode = 0; static std::string mountpoint; static std::string passwd_file = ""; static bool utility_mode = false; -static bool nomultipart = false; static bool noxmlns = false; static bool nocopyapi = false; static bool norenameapi = false; @@ -93,9 +91,7 @@ static uid_t s3fs_uid = 0; // default = root. static gid_t s3fs_gid = 0; // default = root. static bool is_s3fs_umask = false;// default does not set. static mode_t s3fs_umask = 0; - -// if .size()==0 then local file cache is disabled -static std::string use_cache; +static bool is_remove_cache = false; // mutex static pthread_mutex_t *mutex_buf = NULL; @@ -109,8 +105,7 @@ static int get_object_attribute(const char* path, struct stat* pstbuf, headers_t static int check_object_access(const char* path, int mask, struct stat* pstbuf); static int check_object_owner(const char* path, struct stat* pstbuf); static int check_parent_object_access(const char* path, int mask); -static int get_opened_fd(const char* path); -static int get_local_fd(const char* path); +static FdEntity* get_local_fent(const char* path, bool is_load = false); static bool multi_head_callback(S3fsCurl* s3fscurl); static S3fsCurl* multi_head_retry_callback(S3fsCurl* s3fscurl); static int readdir_multi_head(const char* path, S3ObjList& head); @@ -126,8 +121,6 @@ static xmlChar* get_prefix(const char* xml); static xmlChar* get_next_marker(const char* xml); static char* get_object_name(xmlDocPtr doc, xmlNodePtr node, const char* path); static int put_headers(const char* path, headers_t& meta, bool ow_sse_flg); -static int put_multipart_headers(const char* path, headers_t& meta, bool ow_sse_flg); -static int put_local_fd(const char* path, headers_t meta, int fd, bool ow_sse_flg); static int rename_large_object(const char* from, const char* to); static int create_file_object(const char* path, mode_t mode, uid_t uid, gid_t gid); static int create_directory_object(const char* path, mode_t mode, time_t time, uid_t uid, gid_t gid); @@ -135,7 +128,6 @@ static int rename_object(const char* from, const char* to); static int rename_object_nocopy(const char* from, const char* to); static int clone_directory_object(const char* from, const char* to); static int rename_directory(const char* from, const char* to); -static int get_flags(int fd); static void locking_function(int mode, int n, const char* file, int line); static unsigned long id_function(void); static int remote_mountpath_exists(const char* path); @@ -593,97 +585,35 @@ static int check_parent_object_access(const char* path, int mask) return 0; } -// Get fd in mapping data by path -static int get_opened_fd(const char* path) +static FdEntity* get_local_fent(const char* path, bool is_load) { - int fd = -1; - - if(FdCache::getFdCacheData()->Get(path, &fd)){ - FGPRINT(" get_opened_fd: found fd [path=%s] [fd=%d]\n", path, fd); - } - return fd; -} - -static int get_local_fd(const char* path) -{ - int fd = -1; - int result; - struct stat st; struct stat stobj; - string resolved_path(use_cache + "/" + bucket); - string cache_path(resolved_path + path); + FdEntity* ent; - FGPRINT(" get_local_fd[path=%s]\n", path); + FGPRINT(" get_local_fent[path=%s]\n", path); - if(0 != (result = get_object_attribute(path, &stobj))){ - return result; + if(0 != get_object_attribute(path, &stobj)){ + return NULL; } - if(use_cache.size() > 0){ - fd = open(cache_path.c_str(), O_RDWR); // ### TODO should really somehow obey flags here - if(fd != -1){ - if((fstat(fd, &st)) == -1){ - FGPRINT(" get_local_fd: fstat is failed. errno(%d)\n", errno); - SYSLOGERR("fstat is failed. errno(%d)", errno); - close(fd); - return -errno; - } - // if the local and remote mtime/size - // do not match we have an invalid cache entry - if(st.st_size != stobj.st_size || st.st_mtime != stobj.st_mtime){ - if(close(fd) == -1){ - FGPRINT(" get_local_fd: close is failed. errno(%d)\n", errno); - SYSLOGERR("close is failed. errno(%d)", errno); - return -errno; - } - fd = -1; - } - } + // open + time_t mtime = (!S_ISREG(stobj.st_mode) || S_ISLNK(stobj.st_mode)) ? -1 : stobj.st_mtime; + bool force_tmpfile = S_ISREG(stobj.st_mode) ? false : true; + + if(NULL == (ent = FdManager::get()->Open(path, stobj.st_size, mtime, force_tmpfile, true))){ + FGPRINT(" get_local_fent: Coult not open file. errno(%d)\n", errno); + SYSLOGERR("Coult not open file. errno(%d)", errno); + return NULL; + } + // load + if(is_load && !ent->LoadFull()){ + FGPRINT(" get_local_fent: Coult not load file. errno(%d)\n", errno); + SYSLOGERR("Coult not load file. errno(%d)", errno); + FdManager::get()->Close(ent); + return NULL; } - // need to download? - if(fd == -1){ - if(use_cache.size() > 0){ - // only download files, not folders - if(S_ISREG(stobj.st_mode)){ - mkdirp(resolved_path + mydirname(path), 0777); - fd = open(cache_path.c_str(), O_CREAT|O_RDWR|O_TRUNC, stobj.st_mode); - }else{ - // its a folder; do *not* create anything in local cache... - // TODO: do this in a better way) - fd = fileno(tmpfile()); - } - }else{ - fd = fileno(tmpfile()); - } - if(fd == -1){ - FGPRINT(" get_local_fd: Coult not open tmpolary file. errno(%d)\n", errno); - SYSLOGERR("Coult not open tmpolary file. errno(%d)", errno); - return -errno; - } - - // Download - S3fsCurl s3fscurl; - if(0 != (result = s3fscurl.GetObjectRequest(path, fd))){ - return result; - } - - if(S_ISREG(stobj.st_mode) && !S_ISLNK(stobj.st_mode)){ - // make the file's mtime match that of the file on s3 - // if fd is tmpfile, but we force tor set mtime. - struct timeval tv[2]; - tv[0].tv_sec = stobj.st_mtime; - tv[0].tv_usec= 0L; - tv[1].tv_sec = tv[0].tv_sec; - tv[1].tv_usec= 0L; - if(-1 == futimes(fd, tv)){ - FGPRINT(" get_local_fd: futimes failed. errno(%d)\n", errno); - SYSLOGERR("futimes failed. errno(%d)", errno); - return -errno; - } - } - } - return fd; + return ent; } /** @@ -693,7 +623,8 @@ static int get_local_fd(const char* path) */ static int put_headers(const char* path, headers_t& meta, bool ow_sse_flg) { - int result; + int result; + S3fsCurl s3fscurl; struct stat buf; FGPRINT(" put_headers[path=%s]\n", path); @@ -704,156 +635,36 @@ static int put_headers(const char* path, headers_t& meta, bool ow_sse_flg) get_object_attribute(path, &buf); if(buf.st_size >= FIVE_GB){ - return(put_multipart_headers(path, meta, ow_sse_flg)); - } - - S3fsCurl s3fscurl; - if(0 != (result = s3fscurl.PutHeadRequest(path, meta, ow_sse_flg))){ - return result; - } - - // Update mtime in local file cache. - int fd; - time_t mtime = get_mtime(meta); - if(0 <= (fd = get_opened_fd(path))){ - // The file already is opened, so update fd before close(flush); - struct timeval tv[2]; - memset(tv, 0, sizeof(struct timeval) * 2); - tv[0].tv_sec = mtime; - tv[1].tv_sec = tv[0].tv_sec; - if(-1 == futimes(fd, tv)){ - FGPRINT(" put_headers: futimes failed. errno(%d)\n", errno); - SYSLOGERR("futimes failed. errno(%d)", errno); - return -errno; + // multipart + if(0 != (result = s3fscurl.MultipartHeadRequest(path, buf.st_size, meta, ow_sse_flg))){ + return result; } - }else if(use_cache.size() > 0){ - // Use local cache file. - struct stat st; - struct utimbuf n_mtime; - string cache_path(use_cache + "/" + bucket + path); - - if((stat(cache_path.c_str(), &st)) == 0){ - n_mtime.modtime = mtime; - n_mtime.actime = n_mtime.modtime; - if((utime(cache_path.c_str(), &n_mtime)) == -1){ - FGPRINT(" put_headers: utime failed. errno(%d)\n", errno); - SYSLOGERR("utime failed. errno(%d)", errno); - return -errno; - } - } - } - return 0; -} - -static int put_multipart_headers(const char* path, headers_t& meta, bool ow_sse_flg) -{ - int result; - struct stat buf; - S3fsCurl s3fscurl; - - FGPRINT(" put_multipart_headers[path=%s]\n", path); - - // already checked by check_object_access(), so only get attr. - if(0 != (result = get_object_attribute(path, &buf))){ - return result; - } - - // multipart copy - if(0 != (result = s3fscurl.MultipartHeadRequest(path, buf.st_size, meta, ow_sse_flg))){ - return result; - } - - // Update mtime in local file cache. - if(0 < use_cache.size()){ - struct stat st; - struct utimbuf n_mtime; - string cache_path(use_cache + "/" + bucket + path); - - if(0 == stat(cache_path.c_str(), &st)){ - n_mtime.modtime = get_mtime(meta); - n_mtime.actime = n_mtime.modtime; - if(-1 == utime(cache_path.c_str(), &n_mtime)){ - FGPRINT(" put_multipart_headers: utime failed. errno(%d)\n", errno); - SYSLOGERR("utime failed. errno(%d)", errno); - return -errno; - } - } - } - return 0; -} - -/** - * create or update s3 object - * @return fuse return code - */ -static int put_local_fd(const char* path, headers_t meta, int fd, bool ow_sse_flg) -{ - int result; - struct stat st; - - FGPRINT(" put_local_fd[path=%s][fd=%d]\n", path, fd); - - if(fstat(fd, &st) == -1){ - FGPRINT(" put_local_fd: fstatfailed. errno(%d)\n", errno); - SYSLOGERR("fstat failed. errno(%d)", errno); - return -errno; - } - - /* - * Make decision to do multi upload (or not) based upon file size - * - * According to the AWS spec: - * - 1 to 10,000 parts are allowed - * - minimum size of parts is 5MB (expect for the last part) - * - * For our application, we will define part size to be 10MB (10 * 2^20 Bytes) - * maximum file size will be ~64 GB - 2 ** 36 - * - * Initially uploads will be done serially - * - * If file is > 20MB, then multipart will kick in - */ - if(st.st_size > MAX_OBJECT_SIZE){ // 64GB - 1 - // close f ? - return -ENOTSUP; - } - - // seek to head of file. - if(0 != lseek(fd, 0, SEEK_SET)){ - SYSLOGERR("line %d: lseek: %d", __LINE__, -errno); - FGPRINT(" put_local_fd - lseek error(%d)\n", -errno); - return -errno; - } - - if(st.st_size >= MULTIPART_LOWLIMIT && !nomultipart){ // 20MB - // Additional time is needed for large files - time_t backup = 0; - if(120 > S3fsCurl::GetReadwriteTimeout()){ - backup = S3fsCurl::SetReadwriteTimeout(120); - } - result = S3fsCurl::ParallelMultipartUploadRequest(path, meta, fd, ow_sse_flg); - if(0 != backup){ - S3fsCurl::SetReadwriteTimeout(backup); - } }else{ - S3fsCurl s3fscurl; - result = s3fscurl.PutRequest(path, meta, fd, ow_sse_flg); + if(0 != (result = s3fscurl.PutHeadRequest(path, meta, ow_sse_flg))){ + return result; + } } - // seek to head of file. - if(0 != lseek(fd, 0, SEEK_SET)){ - SYSLOGERR("line %d: lseek: %d", __LINE__, -errno); - FGPRINT(" put_local_fd - lseek error(%d)\n", -errno); - return -errno; + FdEntity* ent = NULL; + if(NULL == (ent = FdManager::get()->ExistOpen(path))){ + // no opened fd + if(FdManager::get()->IsCacheDir()){ + // create cache file if be needed + ent = FdManager::get()->Open(path, buf.st_size, -1, false, true); + } + } + if(ent){ + time_t mtime = get_mtime(meta); + ent->SetMtime(mtime); + FdManager::get()->Close(ent); } - return result; + return 0; } static int s3fs_getattr(const char* path, struct stat* stbuf) { int result; - int fd = -1; FGPRINT("s3fs_getattr[path=%s]\n", path); @@ -866,10 +677,14 @@ static int s3fs_getattr(const char* path, struct stat* stbuf) } // If has already opened fd, the st_size shuld be instead. // (See: Issue 241) - if(stbuf && FdCache::getFdCacheData()->Get(path, &fd) && -1 != fd){ - struct stat tmpstbuf; - if(0 == fstat(fd, &tmpstbuf)){ - stbuf->st_size = tmpstbuf.st_size; + if(stbuf){ + FdEntity* ent; + if(NULL != (ent = FdManager::get()->ExistOpen(path))){ + struct stat tmpstbuf; + if(ent->GetStats(tmpstbuf)){ + stbuf->st_size = tmpstbuf.st_size; + } + FdManager::get()->Close(ent); } } return result; @@ -877,42 +692,39 @@ static int s3fs_getattr(const char* path, struct stat* stbuf) static int s3fs_readlink(const char* path, char* buf, size_t size) { - int fd = -1; - - if(size > 0){ - --size; // reserve nil terminator - - FGPRINT("s3fs_readlink[path=%s]\n", path); - - if(0 > (fd = get_local_fd(path))){ - SYSLOGERR("line %d: get_local_fd: %d", __LINE__, -fd); - return -EIO; - } - - struct stat st; - if(fstat(fd, &st) == -1){ - SYSLOGERR("line %d: fstat: %d", __LINE__, -errno); - if(fd > 0){ - close(fd); - } - return -errno; - } - if(st.st_size < (off_t)size){ - size = st.st_size; - } - if(-1 == pread(fd, buf, size, 0)){ - SYSLOGERR("line %d: pread: %d", __LINE__, -errno); - if(fd > 0){ - close(fd); - } - return -errno; - } - buf[size] = 0; + if(!path || !buf || 0 >= size){ + return 0; } - - if(fd > 0){ - close(fd); + // Open + FdEntity* ent; + if(NULL == (ent = get_local_fent(path))){ + FGPRINT("s3fs_readlink: could not get fent(file=%s)\n", path); + SYSLOGERR("could not get fent(file=%s)", path); + return -EIO; } + // Get size + size_t readsize; + if(!ent->GetSize(readsize)){ + FGPRINT("s3fs_readlink: could not get file size(file=%s)\n", path); + SYSLOGERR("could not get file size(file=%s)", path); + FdManager::get()->Close(ent); + return -EIO; + } + if(size <= readsize){ + readsize = size - 1; + } + // Read + ssize_t ressize; + if(0 > (ressize = ent->Read(buf, 0, readsize))){ + FGPRINT("s3fs_readlink: could not read file(file=%s, errno=%zd)\n", path, ressize); + SYSLOGERR("could not read file(file=%s, errno=%zd)", path, ressize); + FdManager::get()->Close(ent); + return static_cast(ressize); + } + buf[ressize] = '\0'; + + FdManager::get()->Close(ent); + return 0; } @@ -971,12 +783,11 @@ static int s3fs_create(const char* path, mode_t mode, struct fuse_file_info* fi) return result; } - // object created, open it - if((fi->fh = get_local_fd(path)) <= 0){ + FdEntity* ent; + if(NULL == (ent = FdManager::get()->Open(path, 0, -1, false, true))){ return -EIO; } - // remember flags and headers... - FdCache::getFdCacheData()->Add(path, fi->fh, fi->flags); + fi->fh = ent->GetFd(); return 0; } @@ -1042,6 +853,7 @@ static int s3fs_unlink(const char* path) } S3fsCurl s3fscurl; result = s3fscurl.DeleteRequest(path); + FdManager::DeleteCacheFile(path); StatCache::getStatCacheData()->DelStat(path); return result; @@ -1119,7 +931,6 @@ static int s3fs_rmdir(const char* path) static int s3fs_symlink(const char* from, const char* to) { int result; - int fd = -1; struct fuse_context* pcxt; FGPRINT("s3fs_symlink[from=%s][to=%s]\n", from, to); @@ -1144,31 +955,31 @@ static int s3fs_symlink(const char* from, const char* to) headers["x-amz-meta-uid"] = str(pcxt->uid); headers["x-amz-meta-gid"] = str(pcxt->gid); - fd = fileno(tmpfile()); - if(fd == -1){ - SYSLOGERR("line %d: error: fileno(tmpfile()): %d", __LINE__, -errno); + // open tmpfile + FdEntity* ent; + if(NULL == (ent = FdManager::get()->Open(to, 0, -1, true, true))){ + FGPRINT("s3fs_symlink: could not open tmpfile(errno=%d)\n", errno); + SYSLOGERR("could not open tmpfile(errno=%d)", errno); return -errno; } - - if(pwrite(fd, from, strlen(from), 0) == -1){ - SYSLOGERR("line %d: error: pwrite: %d", __LINE__, -errno); - if(fd > 0){ - close(fd); - } + // write + ssize_t from_size = strlen(from); + if(from_size != ent->Write(from, 0, from_size)){ + FGPRINT("s3fs_symlink: could not write tmpfile(errno=%d)\n", errno); + SYSLOGERR("could not write tmpfile(errno=%d)", errno); + FdManager::get()->Close(ent); return -errno; } + // upload + if(0 != (result = ent->Flush(headers, true, true))){ + FGPRINT("s3fs_symlink: could not upload tmpfile(result=%d)\n", result); + SYSLOGERR("could not upload tmpfile(result=%d)", result); + } + FdManager::get()->Close(ent); - if(0 != (result = put_local_fd(to, headers, fd, true))){ - if(fd > 0){ - close(fd); - } - return result; - } - if(fd > 0){ - close(fd); - } StatCache::getStatCacheData()->DelStat(to); - return 0; + + return result; } static int rename_object(const char* from, const char* to) @@ -1210,8 +1021,6 @@ static int rename_object_nocopy(const char* from, const char* to) { int result; headers_t meta; - int fd; - int isclose = 1; FGPRINT("rename_object_nocopy [from=%s] [to=%s]\n", from , to); SYSLOGDBG("rename_object_nocopy [from=%s] [to=%s]", from, to); @@ -1225,37 +1034,30 @@ static int rename_object_nocopy(const char* from, const char* to) return result; } - // Downloading - if(0 > (fd = get_opened_fd(from))){ - if(0 > (fd = get_local_fd(from))){ - FGPRINT(" rename_object_nocopy line %d: get_local_fd result: %d\n", __LINE__, fd); - SYSLOGERR("rename_object_nocopy line %d: get_local_fd result: %d", __LINE__, fd); - return -EIO; - } - }else{ - isclose = 0; - } - // Get attributes if(0 != (result = get_object_attribute(from, NULL, &meta))){ - if(isclose){ - close(fd); - } return result; } // Set header meta["Content-Type"] = S3fsCurl::LookupMimeType(string(to)); - // Re-uploading - result = put_local_fd(to, meta, fd, false); - if(isclose){ - close(fd); + // open & load + FdEntity* ent; + if(NULL == (ent = get_local_fent(from, true))){ + FGPRINT(" rename_object_nocopy: could not open and read file(%s)\n", from); + SYSLOGERR("could not open and read file(%s)", from); + return -EIO; } - if(0 != result){ - FGPRINT(" rename_object_nocopy line %d: put_local_fd result: %d\n", __LINE__, result); + + // upload + if(0 != (result = ent->RowFlush(to, meta, false, true))){ + FGPRINT(" rename_object_nocopy: could not upload file(%s): result=%d\n", to, result); + SYSLOGERR("could not upload file(%s): result=%d", to, result); + FdManager::get()->Close(ent); return result; } + FdManager::get()->Close(ent); // Remove file result = s3fs_unlink(from); @@ -1610,42 +1412,29 @@ static int s3fs_chmod_nocopy(const char* path, mode_t mode) } }else{ // normal object or directory object of newer version - int fd; - int isclose = 1; - - // Downloading - if(0 > (fd = get_opened_fd(strpath.c_str()))){ - if(0 > (fd = get_local_fd(strpath.c_str()))){ - FGPRINT(" s3fs_chmod_nocopy line %d: get_local_fd result: %d\n", __LINE__, fd); - SYSLOGERR("s3fs_chmod_nocopy line %d: get_local_fd result: %d", __LINE__, fd); - return -EIO; - } - }else{ - isclose = 0; - } // Change file mode meta["x-amz-meta-mode"] = str(mode); - // Change local file mode - if(-1 == fchmod(fd, mode)){ - if(isclose){ - close(fd); - } - FGPRINT(" s3fs_chmod_nocopy line %d: fchmod(fd=%d) error(%d)\n", __LINE__, fd, errno); - SYSLOGERR("s3fs_chmod_nocopy line %d: fchmod(fd=%d) error(%d)", __LINE__, fd, errno); - return -errno; + + // open & load + FdEntity* ent; + if(NULL == (ent = get_local_fent(strpath.c_str(), true))){ + FGPRINT(" s3fs_chmod_nocopy: could not open and read file(%s)\n", strpath.c_str()); + SYSLOGERR("could not open and read file(%s)", strpath.c_str()); + return -EIO; } - // Re-uploading - if(0 != (result = put_local_fd(strpath.c_str(), meta, fd, false))){ - FGPRINT(" s3fs_chmod_nocopy line %d: put_local_fd result: %d\n", __LINE__, result); - } - if(isclose){ - close(fd); + // upload + if(0 != (result = ent->Flush(meta, false, true))){ + FGPRINT(" s3fs_chmod_nocopy: could not upload file(%s): result=%d\n", strpath.c_str(), result); + SYSLOGERR("could not upload file(%s): result=%d", strpath.c_str(), result); + FdManager::get()->Close(ent); + return result; } + FdManager::get()->Close(ent); + StatCache::getStatCacheData()->DelStat(nowcache); } - return result; } @@ -1785,44 +1574,30 @@ static int s3fs_chown_nocopy(const char* path, uid_t uid, gid_t gid) } }else{ // normal object or directory object of newer version - int fd; - int isclose = 1; - - // Downloading - if(0 > (fd = get_opened_fd(strpath.c_str()))){ - if(0 > (fd = get_local_fd(strpath.c_str()))){ - FGPRINT(" s3fs_chown_nocopy line %d: get_local_fd result: %d\n", __LINE__, fd); - SYSLOGERR("s3fs_chown_nocopy line %d: get_local_fd result: %d", __LINE__, fd); - return -EIO; - } - }else{ - isclose = 0; - } // Change owner meta["x-amz-meta-uid"] = str(uid); meta["x-amz-meta-gid"] = str(gid); - // Change local file owner - if(-1 == fchown(fd, uid, gid)){ - if(isclose){ - close(fd); - } - FGPRINT(" s3fs_chown_nocopy line %d: fchown(fd=%d, uid=%d, gid=%d) is error(%d)\n", __LINE__, fd, (int)uid, (int)gid, errno); - SYSLOGERR("s3fs_chown_nocopy line %d: fchown(fd=%d, uid=%d, gid=%d) is error(%d)", __LINE__, fd, (int)uid, (int)gid, errno); - return -errno; + // open & load + FdEntity* ent; + if(NULL == (ent = get_local_fent(strpath.c_str(), true))){ + FGPRINT(" s3fs_chown_nocopy: could not open and read file(%s)\n", strpath.c_str()); + SYSLOGERR("could not open and read file(%s)", strpath.c_str()); + return -EIO; } - // Re-uploading - if(0 != (result = put_local_fd(strpath.c_str(), meta, fd, false))){ - FGPRINT(" s3fs_chown_nocopy line %d: put_local_fd result: %d\n", __LINE__, result); - } - if(isclose){ - close(fd); + // upload + if(0 != (result = ent->Flush(meta, false, true))){ + FGPRINT(" s3fs_chown_nocopy: could not upload file(%s): result=%d\n", strpath.c_str(), result); + SYSLOGERR("could not upload file(%s): result=%d", strpath.c_str(), result); + FdManager::get()->Close(ent); + return result; } + FdManager::get()->Close(ent); + StatCache::getStatCacheData()->DelStat(nowcache); } - return result; } @@ -1937,55 +1712,45 @@ static int s3fs_utimens_nocopy(const char* path, const struct timespec ts[2]) } }else{ // normal object or directory object of newer version - int fd; - int isclose = 1; - struct timeval tv[2]; - - // Downloading - if(0 > (fd = get_opened_fd(strpath.c_str()))){ - if(0 > (fd = get_local_fd(strpath.c_str()))){ - FGPRINT(" s3fs_utimens_nocopy line %d: get_local_fd result: %d\n", __LINE__, fd); - SYSLOGERR("s3fs_utimens_nocopy line %d: get_local_fd result: %d", __LINE__, fd); - return -EIO; - } - }else{ - isclose = 0; - } // Change date meta["x-amz-meta-mtime"] = str(ts[1].tv_sec); - // Change local file date - TIMESPEC_TO_TIMEVAL(&tv[0], &ts[0]); - TIMESPEC_TO_TIMEVAL(&tv[1], &ts[1]); - if(-1 == futimes(fd, tv)){ - if(isclose){ - close(fd); - } - FGPRINT(" s3fs_utimens_nocopy line %d: futimes(fd=%d, ...) is error(%d)\n", __LINE__, fd, errno); - SYSLOGERR("s3fs_utimens_nocopy line %d: futimes(fd=%d, ...) is error(%d)", __LINE__, fd, errno); - return -errno; + // open & load + FdEntity* ent; + if(NULL == (ent = get_local_fent(strpath.c_str(), true))){ + FGPRINT(" s3fs_utimens_nocopy: could not open and read file(%s)\n", strpath.c_str()); + SYSLOGERR("could not open and read file(%s)", strpath.c_str()); + return -EIO; } - // Re-uploading - if(0 != (result = put_local_fd(strpath.c_str(), meta, fd, false))){ - FGPRINT(" s3fs_utimens_nocopy line %d: put_local_fd result: %d\n", __LINE__, result); + // set mtime + if(0 != (result = ent->SetMtime(ts[1].tv_sec))){ + FGPRINT(" s3fs_utimens_nocopy: could not set mtime to file(%s): result=%d\n", strpath.c_str(), result); + SYSLOGERR("could not set mtime to file(%s): result=%d", strpath.c_str(), result); + FdManager::get()->Close(ent); + return result; } - if(isclose){ - close(fd); + + // upload + if(0 != (result = ent->Flush(meta, false, true))){ + FGPRINT(" s3fs_utimens_nocopy: could not upload file(%s): result=%d\n", strpath.c_str(), result); + SYSLOGERR("could not upload file(%s): result=%d", strpath.c_str(), result); + FdManager::get()->Close(ent); + return result; } + FdManager::get()->Close(ent); + StatCache::getStatCacheData()->DelStat(nowcache); } - return result; } static int s3fs_truncate(const char* path, off_t size) { - int fd = -1; int result; headers_t meta; - int isclose = 1; + FdEntity* ent = NULL; FGPRINT("s3fs_truncate[path=%s][size=%zd]\n", path, size); @@ -1998,41 +1763,37 @@ static int s3fs_truncate(const char* path, off_t size) // Get file information if(0 == (result = get_object_attribute(path, NULL, &meta))){ - // Exists -> Get file - if(0 > (fd = get_opened_fd(path))){ - if(0 > (fd = get_local_fd(path))){ - FGPRINT(" s3fs_truncate line %d: get_local_fd result: %d\n", __LINE__, fd); - SYSLOGERR("s3fs_truncate line %d: get_local_fd result: %d", __LINE__, fd); - return -EIO; - } - }else{ - isclose = 0; + // Exists -> Get file(with size) + if(NULL == (ent = FdManager::get()->Open(path, size, -1, false, true))){ + FGPRINT(" s3fs_truncate: could not open file(%s): errno=%d\n", path, errno); + SYSLOGERR("could not open file(%s): errno=%d", path, errno); + return -EIO; } + if(!ent->Load(0, size)){ + FGPRINT(" s3fs_truncate: could not download file(%s): errno=%d\n", path, errno); + SYSLOGERR("could not download file(%s): errno=%d", path, errno); + FdManager::get()->Close(ent); + return -EIO; + } + }else{ - // Not found -> Make tmpfile - if(-1 == (fd = fileno(tmpfile()))){ - SYSLOGERR("error: line %d: %d", __LINE__, -errno); - return -errno; + // Not found -> Make tmpfile(with size) + if(NULL == (ent = FdManager::get()->Open(path, size, -1, true, true))){ + FGPRINT(" s3fs_truncate: could not open file(%s): errno=%d\n", path, errno); + SYSLOGERR("could not open file(%s): errno=%d", path, errno); + return -EIO; } } - // Truncate - if(0 != ftruncate(fd, size) || 0 != fsync(fd)){ - FGPRINT(" s3fs_truncate line %d: ftruncate or fsync returned err(%d)\n", __LINE__, errno); - SYSLOGERR("s3fs_truncate line %d: ftruncate or fsync returned err(%d)", __LINE__, errno); - if(isclose){ - close(fd); - } - return -errno; + // upload + if(0 != (result = ent->Flush(meta, false, true))){ + FGPRINT(" s3fs_chmod_nocopy: could not upload file(%s): result=%d\n", path, result); + SYSLOGERR("could not upload file(%s): result=%d", path, result); + FdManager::get()->Close(ent); + return result; } + FdManager::get()->Close(ent); - // Re-uploading - if(0 != (result = put_local_fd(path, meta, fd, false))){ - FGPRINT(" s3fs_truncate line %d: put_local_fd result: %d\n", __LINE__, result); - } - if(isclose){ - close(fd); - } StatCache::getStatCacheData()->DelStat(path); return result; @@ -2042,6 +1803,7 @@ static int s3fs_open(const char* path, struct fuse_file_info* fi) { int result; headers_t meta; + struct stat st; FGPRINT("s3fs_open[path=%s][flags=%d]\n", path, fi->flags); @@ -2049,7 +1811,8 @@ static int s3fs_open(const char* path, struct fuse_file_info* fi) if(0 != (result = check_parent_object_access(path, X_OK))){ return result; } - result = check_object_access(path, mask, NULL); + + result = check_object_access(path, mask, &st); if(-ENOENT == result){ if(0 != (result = check_parent_object_access(path, W_OK))){ return result; @@ -2058,50 +1821,81 @@ static int s3fs_open(const char* path, struct fuse_file_info* fi) return result; } - // Go do the truncation if called for if((unsigned int)fi->flags & O_TRUNC){ - result = s3fs_truncate(path, 0); - if(result != 0) - return result; + st.st_size = 0; + } + if(!S_ISREG(st.st_mode) || S_ISLNK(st.st_mode)){ + st.st_mtime = -1; } - if((fi->fh = get_local_fd(path)) <= 0){ + FdEntity* ent; + if(NULL == (ent = FdManager::get()->Open(path, st.st_size, st.st_mtime, false, true))){ return -EIO; } - // remember flags and headers... - FdCache::getFdCacheData()->Add(path, fi->fh, fi->flags); + fi->fh = ent->GetFd(); return 0; } static int s3fs_read(const char* path, char* buf, size_t size, off_t offset, struct fuse_file_info* fi) { - int res; + ssize_t res; // Commented - This message is output too much -//FGPRINT("s3fs_read[path=%s][size=%zd][offset=%zd][fd=%zd]\n", path, size, offset, fi->fh); + FGPRINT2("s3fs_read[path=%s][size=%zd][offset=%zd][fd=%zd]\n", path, size, offset, fi->fh); - if(-1 == (res = pread(fi->fh, buf, size, offset))){ - FGPRINT(" s3fs_read: pread failed. errno(%d)\n", errno); - SYSLOGERR("pread failed. errno(%d)", errno); - return -errno; + FdEntity* ent; + if(NULL == (ent = FdManager::get()->ExistOpen(path))){ + FGPRINT(" s3fs_read: could not find opened fd(%s)\n", path); + SYSLOGERR("could not find opened fd(%s)", path); + return -EIO; } - return res; + if(ent->GetFd() != static_cast(fi->fh)){ + FGPRINT(" s3fs_read: Warning - different fd(%d - %zd)\n", ent->GetFd(), fi->fh); + SYSLOGERR("Warning - different fd(%d - %zd)", ent->GetFd(), fi->fh); + } + + // check real file size + size_t realsize = 0; + if(!ent->GetSize(realsize) || 0 >= realsize){ + //FGPRINT(" s3fs_read: file size is 0, so break to read.\n"); + FdManager::get()->Close(ent); + return 0; + } + + if(0 > (res = ent->Read(buf, offset, size, false))){ + FGPRINT(" s3fs_read: failed to read file(%s). result=%zd\n", path, res); + SYSLOGERR("failed to read file(%s). result=%zd", path, res); + } + FdManager::get()->Close(ent); + + return static_cast(res); } static int s3fs_write(const char* path, const char* buf, size_t size, off_t offset, struct fuse_file_info* fi) { - int res; + ssize_t res; // Commented - This message is output too much -//FGPRINT("s3fs_write[path=%s][size=%zd][offset=%zd][fd=%zd]\n", path, size, offset, fi->fh); + FGPRINT2("s3fs_write[path=%s][size=%zd][offset=%zd][fd=%zd]\n", path, size, offset, fi->fh); - if(-1 == (res = pwrite(fi->fh, buf, size, offset))){ - FGPRINT(" s3fs_write: pwrite failed. errno(%d)\n", errno); - SYSLOGERR("pwrite failed. errno(%d)", errno); - return -errno; + FdEntity* ent; + if(NULL == (ent = FdManager::get()->ExistOpen(path))){ + FGPRINT(" s3fs_write: could not find opened fd(%s)\n", path); + SYSLOGERR("could not find opened fd(%s)", path); + return -EIO; } - return res; + if(ent->GetFd() != static_cast(fi->fh)){ + FGPRINT(" s3fs_write: Warning - different fd(%d - %zd)\n", ent->GetFd(), fi->fh); + SYSLOGERR("Warning - different fd(%d - %zd)", ent->GetFd(), fi->fh); + } + if(0 > (res = ent->Write(buf, offset, size))){ + FGPRINT(" s3fs_write: failed to write file(%s). result=%zd\n", path, res); + SYSLOGERR("failed to write file(%s). result=%zd", path, res); + } + FdManager::get()->Close(ent); + + return static_cast(res); } static int s3fs_statfs(const char* path, struct statvfs* stbuf) @@ -2115,20 +1909,11 @@ static int s3fs_statfs(const char* path, struct statvfs* stbuf) return 0; } -static int get_flags(int fd) -{ - int flags = 0; - FdCache::getFdCacheData()->Get(fd, &flags); - return flags; -} - static int s3fs_flush(const char* path, struct fuse_file_info* fi) { - int flags; int result; - int fd = fi->fh; - FGPRINT("s3fs_flush[path=%s][fd=%d]\n", path, fd); + FGPRINT("s3fs_flush[path=%s][fd=%zd]\n", path, fi->fh); int mask = (O_RDONLY != (fi->flags & O_ACCMODE) ? W_OK : R_OK); if(0 != (result = check_parent_object_access(path, X_OK))){ @@ -2143,53 +1928,51 @@ static int s3fs_flush(const char* path, struct fuse_file_info* fi) return result; } - // NOTE- fi->flags is not available here - flags = get_flags(fd); - if(O_RDONLY != (flags & O_ACCMODE)){ + FdEntity* ent; + if(NULL != (ent = FdManager::get()->ExistOpen(path))){ headers_t meta; if(0 != (result = get_object_attribute(path, NULL, &meta))){ - return result; - } - - // if the cached file matches the remote file skip uploading - struct stat st; - if(-1 == fstat(fd, &st)){ - FGPRINT(" s3fs_flush: fstat failed. errno(%d)\n", errno); - SYSLOGERR("fstat failed. errno(%d)", errno); - return -errno; - } - - if(str(st.st_size) == meta["Content-Length"] && - (str(st.st_mtime) == meta["x-amz-meta-mtime"])){ + FdManager::get()->Close(ent); return result; } // If both mtime are not same, force to change mtime based on fd. - if(str(st.st_mtime) != meta["x-amz-meta-mtime"]){ - meta["x-amz-meta-mtime"] = str(st.st_mtime); + time_t ent_mtime; + if(ent->GetMtime(ent_mtime)){ + if(str(ent_mtime) != meta["x-amz-meta-mtime"]){ + meta["x-amz-meta-mtime"] = str(ent_mtime); + } } - - // when updates file, always updates sse mode. - return put_local_fd(path, meta, fd, true); + result = ent->Flush(meta, true, false); + FdManager::get()->Close(ent); } - - return 0; + return result; } static int s3fs_release(const char* path, struct fuse_file_info* fi) { FGPRINT("s3fs_release[path=%s][fd=%ld]\n", path, fi->fh); - // clear file discriptor mapping. - if(!FdCache::getFdCacheData()->Del(path, fi->fh)){ - FGPRINT(" s3fs_release: failed to release fd[path=%s][fd=%ld]\n", path, fi->fh); + FdEntity* ent; + if(NULL == (ent = FdManager::get()->GetFdEntity(path))){ + FGPRINT(" s3fs_release: could not find fd(file=%s)\n", path); + SYSLOGERR("could not find fd(file=%s)", path); + return -EIO; + } + if(ent->GetFd() != static_cast(fi->fh)){ + FGPRINT(" s3fs_release: Warning - different fd(%d - %zd)\n", ent->GetFd(), fi->fh); + SYSLOGERR("Warning - different fd(%d - %zd)", ent->GetFd(), fi->fh); + } + FdManager::get()->Close(ent); + + // check - for debug + if(debug){ + if(NULL != (ent = FdManager::get()->GetFdEntity(path))){ + FGPRINT(" s3fs_release: Warning - file(%s),fd(%d) is still opened.\n", path, ent->GetFd()); + SYSLOGERR("Warning - file(%s),fd(%d) is still opened.", path, ent->GetFd()); + } } - if(-1 == close(fi->fh)){ - FGPRINT(" s3fs_release: close failed. errno(%d)\n", errno); - SYSLOGERR("close failed. errno(%d)", errno); - return -errno; - } if((fi->flags & O_RDWR) || (fi->flags & O_WRONLY)){ StatCache::getStatCacheData()->DelStat(path); } @@ -2712,6 +2495,12 @@ static void* s3fs_init(struct fuse_conn_info* conn) if((unsigned int)conn->capable & FUSE_CAP_ATOMIC_O_TRUNC){ conn->want |= FUSE_CAP_ATOMIC_O_TRUNC; } + // cache + if(is_remove_cache && !FdManager::DeleteCacheDirectory()){ + //FGPRINT("s3fs_init: Could not inilialize cache directory.\n"); + //SYSLOGDBG("Could not inilialize cache directory."); + } + return 0; } @@ -2728,6 +2517,12 @@ static void s3fs_destroy(void*) } free(mutex_buf); mutex_buf = NULL; + + // cache + if(is_remove_cache && !FdManager::DeleteCacheDirectory()){ + //FGPRINT("s3fs_destroy: Could not remove cache directory.\n"); + //SYSLOGDBG("Could not remove cache directory."); + } } static int s3fs_access(const char* path, int mask) @@ -3225,7 +3020,11 @@ static int my_fuse_opt_proc(void* data, const char* arg, int key, struct fuse_ar return 0; } if(strstr(arg, "use_cache=") != 0){ - use_cache = strchr(arg, '=') + sizeof(char); + FdManager::SetCacheDir(strchr(arg, '=') + sizeof(char)); + return 0; + } + if(strstr(arg, "del_cache") != 0){ + is_remove_cache = true; return 0; } if(strstr(arg, "multireq_max=") != 0){ @@ -3343,14 +3142,24 @@ static int my_fuse_opt_proc(void* data, const char* arg, int key, struct fuse_ar S3fsCurl::SetDnsCache(false); return 0; } - if(strstr(arg, "parallel_upload=") != 0){ + if(strstr(arg, "parallel_count=") != 0 || strstr(arg, "parallel_upload=") != 0){ int maxpara = (int)strtoul(strchr(arg, '=') + sizeof(char), 0, 10); if(0 >= maxpara){ - fprintf(stderr, "%s: argument should be over 1: parallel_upload\n", + fprintf(stderr, "%s: argument should be over 1: parallel_count\n", program_name.c_str()); return -1; } - S3fsCurl::SetMaxParallelUpload(maxpara); + S3fsCurl::SetMaxParallelCount(maxpara); + return 0; + } + if(strstr(arg, "fd_page_size=") != 0){ + ssize_t pagesize = static_cast(strtoul(strchr(arg, '=') + sizeof(char), 0, 10)); + if((1024 * 1024) >= pagesize){ + fprintf(stderr, "%s: argument should be over 1MB: fd_page_size\n", + program_name.c_str()); + return -1; + } + FdManager::SetPageSize(pagesize); return 0; } if(strstr(arg, "noxmlns") != 0){ @@ -3391,7 +3200,7 @@ static int my_fuse_opt_proc(void* data, const char* arg, int key, struct fuse_ar // debug output if((strcmp(arg, "-d") == 0) || (strcmp(arg, "--debug") == 0)){ if(!debug){ - debug = 1; + debug = true; return 0; }else{ // fuse doesn't understand "--debug", but it @@ -3404,6 +3213,11 @@ static int my_fuse_opt_proc(void* data, const char* arg, int key, struct fuse_ar } } } + // for deep debugging message + if(strstr(arg, "f2") != 0){ + foreground2 = true; + return 0; + } if(strstr(arg, "accessKeyId=") != 0){ fprintf(stderr, "%s: option accessKeyId is no longer supported\n", @@ -3457,7 +3271,7 @@ int main(int argc, char* argv[]) case 'd': break; case 'f': - foreground = 1; + foreground = true; break; case 's': break; diff --git a/src/s3fs_util.cpp b/src/s3fs_util.cpp index 3eef28b..42ee761 100644 --- a/src/s3fs_util.cpp +++ b/src/s3fs_util.cpp @@ -28,6 +28,9 @@ #include #include #include +#include +#include +#include #include #include @@ -397,6 +400,56 @@ void free_mvnodes(MVNODE *head) return; } +//------------------------------------------------------------------- +// Class AutoLock +//------------------------------------------------------------------- +AutoLock::AutoLock(pthread_mutex_t* pmutex) : auto_mutex(pmutex), is_locked(false) +{ + Lock(); +} + +AutoLock::~AutoLock() +{ + Unlock(); +} + +bool AutoLock::Lock(void) +{ + if(!auto_mutex){ + return false; + } + if(is_locked){ + // already locked + return true; + } + try{ + pthread_mutex_lock(auto_mutex); + is_locked = true; + }catch(exception& e){ + is_locked = false; + return false; + } + return true; +} + +bool AutoLock::Unlock(void) +{ + if(!auto_mutex){ + return false; + } + if(!is_locked){ + // already unlocked + return true; + } + try{ + pthread_mutex_unlock(auto_mutex); + is_locked = false; + }catch(exception& e){ + return false; + } + return true; +} + //------------------------------------------------------------------- // Utility for UID/GID //------------------------------------------------------------------- @@ -494,6 +547,53 @@ int mkdirp(const string& path, mode_t mode) return 0; } +bool delete_files_in_dir(const char* dir, bool is_remove_own) +{ + DIR* dp; + struct dirent* dent; + + if(NULL == (dp = opendir(dir))){ + //FGPRINT("delete_files_in_dir: could not open dir(%s) - errno(%d)\n", dir, errno); + return false; + } + + for(dent = readdir(dp); dent; dent = readdir(dp)){ + if(0 == strcmp(dent->d_name, "..") || 0 == strcmp(dent->d_name, ".")){ + continue; + } + string fullpath = dir; + fullpath += "/"; + fullpath += dent->d_name; + struct stat st; + if(0 != lstat(fullpath.c_str(), &st)){ + FGPRINT("delete_files_in_dir: could not get stats of file(%s) - errno(%d)\n", fullpath.c_str(), errno); + closedir(dp); + return false; + } + if(S_ISDIR(st.st_mode)){ + // dir -> Reentrant + if(!delete_files_in_dir(fullpath.c_str(), true)){ + //FGPRINT("delete_files_in_dir: could not remove sub dir(%s) - errno(%d)\n", fullpath.c_str(), errno); + closedir(dp); + return false; + } + }else{ + if(0 != unlink(fullpath.c_str())){ + FGPRINT("delete_files_in_dir: could not remove file(%s) - errno(%d)\n", fullpath.c_str(), errno); + closedir(dp); + return false; + } + } + } + closedir(dp); + + if(is_remove_own && 0 != rmdir(dir)){ + FGPRINT("delete_files_in_dir: could not remove dir(%s) - errno(%d)\n", dir, errno); + return false; + } + return true; +} + //------------------------------------------------------------------- // Utility functions for convert //------------------------------------------------------------------- @@ -709,6 +809,9 @@ void show_help (void) " use_cache (default=\"\" which means disabled)\n" " - local folder to use for local file cache\n" "\n" + " del_cache (delete local file cache)\n" + " - delete local file cache when s3fs starts and exits.\n" + "\n" " use_rrs (default is disable)\n" " - this option makes Amazon's Reduced Redundancy Storage enable.\n" "\n" @@ -749,7 +852,7 @@ void show_help (void) " multireq_max (default=\"500\")\n" " - maximum number of parallel request for listing objects.\n" "\n" - " parallel_upload (default=\"5\")\n" + " parallel_count (default=\"5\")\n" " - number of parallel request for uploading big objects.\n" " s3fs uploads large object(over 20MB) by multipart post request, \n" " and sends parallel requests.\n" @@ -757,6 +860,14 @@ void show_help (void) " at once. It is necessary to set this value depending on a CPU \n" " and a network band.\n" "\n" + " fd_page_size (default=\"52428800\"(50MB))\n" + " - number of internal management page size for each file discriptor.\n" + " For delayed reading and writing by s3fs, s3fs manages pages which \n" + " is separated from object. Each pages has a status that data is \n" + " already loaded(or not loaded yet).\n" + " This option should not be changed when you don't have a trouble \n" + " with performance.\n" + "\n" " url (default=\"http://s3.amazonaws.com\")\n" " - sets the url to use to access amazon s3\n" "\n" diff --git a/src/s3fs_util.h b/src/s3fs_util.h index 90fde86..e400757 100644 --- a/src/s3fs_util.h +++ b/src/s3fs_util.h @@ -64,6 +64,20 @@ typedef struct mvnode { struct mvnode *next; } MVNODE; +class AutoLock +{ + private: + pthread_mutex_t* auto_mutex; + bool is_locked; + + public: + AutoLock(pthread_mutex_t* pmutex = NULL); + ~AutoLock(); + + bool Lock(void); + bool Unlock(void); +}; + //------------------------------------------------------------------- // Functions //------------------------------------------------------------------- @@ -79,6 +93,7 @@ int is_uid_inculde_group(uid_t uid, gid_t gid); std::string mydirname(std::string path); std::string mybasename(std::string path); int mkdirp(const std::string& path, mode_t mode); +bool delete_files_in_dir(const char* dir, bool is_remove_own); time_t get_mtime(const char *s); time_t get_mtime(headers_t& meta, bool overcheck = true);