diff --git a/src/Makefile.am b/src/Makefile.am index 46ac6a2..a86be89 100644 --- a/src/Makefile.am +++ b/src/Makefile.am @@ -54,6 +54,7 @@ s3fs_SOURCES = \ addhead.cpp \ sighandlers.cpp \ autolock.cpp \ + threadpoolman.cpp \ common_auth.cpp if USE_SSL_OPENSSL s3fs_SOURCES += openssl_auth.cpp diff --git a/src/curl.cpp b/src/curl.cpp index 8056d18..ee03776 100644 --- a/src/curl.cpp +++ b/src/curl.cpp @@ -1200,6 +1200,72 @@ int S3fsCurl::MapPutErrorResponse(int result) return result; } +// [NOTE] +// It is a factory method as utility because it requires an S3fsCurl object +// initialized for multipart upload from outside this class. +// +S3fsCurl* S3fsCurl::CreateParallelS3fsCurl(const char* tpath, int fd, off_t start, off_t size, int part_num, bool is_copy, etagpair* petag, const std::string& upload_id, int& result) +{ + // duplicate fd + if(!tpath || -1 == fd || start < 0 || size <= 0 || !petag){ + S3FS_PRN_ERR("Parameters are wrong: tpath(%s), fd(%d), start(%lld), size(%lld), petag(%s)", SAFESTRPTR(tpath), fd, static_cast(start), static_cast(size), (petag ? "not null" : "null")); + result = -EIO; + return NULL; + } + result = 0; + + S3fsCurl* s3fscurl = new S3fsCurl(true); + + if(!is_copy){ + s3fscurl->partdata.fd = fd; + s3fscurl->partdata.startpos = start; + s3fscurl->partdata.size = size; + s3fscurl->partdata.is_copy = is_copy; + s3fscurl->partdata.petag = petag; // [NOTE] be careful, the value is set directly + s3fscurl->b_partdata_startpos = s3fscurl->partdata.startpos; + s3fscurl->b_partdata_size = s3fscurl->partdata.size; + + S3FS_PRN_INFO3("Upload Part [tpath=%s][start=%lld][size=%lld][part=%d]", SAFESTRPTR(tpath), static_cast(start), static_cast(size), part_num); + + if(0 != (result = s3fscurl->UploadMultipartPostSetup(tpath, part_num, upload_id))){ + S3FS_PRN_ERR("failed uploading part setup(%d)", result); + delete s3fscurl; + return NULL; + } + }else{ + headers_t meta; + std::string srcresource; + std::string srcurl; + MakeUrlResource(get_realpath(tpath).c_str(), srcresource, srcurl); + meta["x-amz-copy-source"] = srcresource; + + std::ostringstream strrange; + strrange << "bytes=" << start << "-" << (start + size - 1); + meta["x-amz-copy-source-range"] = strrange.str(); + + s3fscurl->b_from = SAFESTRPTR(tpath); + s3fscurl->b_meta = meta; + s3fscurl->partdata.petag = petag; // [NOTE] be careful, the value is set directly + + S3FS_PRN_INFO3("Copy Part [tpath=%s][start=%lld][size=%lld][part=%d]", SAFESTRPTR(tpath), static_cast(start), static_cast(size), part_num); + + if(0 != (result = s3fscurl->CopyMultipartPostSetup(tpath, tpath, part_num, upload_id, meta))){ + S3FS_PRN_ERR("failed uploading part setup(%d)", result); + delete s3fscurl; + return NULL; + } + } + + // Call lazy function + if(!s3fscurl->fpLazySetup || !s3fscurl->fpLazySetup(s3fscurl)){ + S3FS_PRN_ERR("failed lazy function setup for uploading part"); + result = -EIO; + delete s3fscurl; + return NULL; + } + return s3fscurl; +} + int S3fsCurl::ParallelMultipartUploadRequest(const char* tpath, headers_t& meta, int fd) { int result; diff --git a/src/curl.h b/src/curl.h index ba87486..82dd1a5 100644 --- a/src/curl.h +++ b/src/curl.h @@ -45,7 +45,7 @@ // CURLOPT_SSL_ENABLE_ALPN 7.36.0 and later // CURLOPT_KEEP_SENDING_ON_ERROR 7.51.0 and later // -// s3fs uses these, if you build s3fs with the old libcurl, +// s3fs uses these, if you build s3fs with the old libcurl, // substitute the following symbols to avoid errors. // If the version of libcurl linked at runtime is old, // curl_easy_setopt results in an error(CURLE_UNKNOWN_OPTION) and @@ -191,7 +191,7 @@ class S3fsCurl std::vector *completed_tids; s3fscurl_lazy_setup fpLazySetup; // curl options for lazy setting function CURLcode curlCode; // handle curl return - + public: static const long S3FSCURL_RESPONSECODE_NOTSET = -1; static const long S3FSCURL_RESPONSECODE_FATAL_ERROR = -2; @@ -230,7 +230,6 @@ class S3fsCurl static S3fsCurl* ParallelGetObjectRetryCallback(S3fsCurl* s3fscurl); // lazy functions for set curl options - static bool UploadMultipartPostSetCurlOpts(S3fsCurl* s3fscurl); static bool CopyMultipartPostSetCurlOpts(S3fsCurl* s3fscurl); static bool PreGetObjectRequestSetCurlOpts(S3fsCurl* s3fscurl); static bool PreHeadRequestSetCurlOpts(S3fsCurl* s3fscurl); @@ -259,7 +258,6 @@ class S3fsCurl int CopyMultipartPostSetup(const char* from, const char* to, int part_num, const std::string& upload_id, headers_t& meta); bool UploadMultipartPostComplete(); bool CopyMultipartPostComplete(); - bool MixMultipartPostComplete(); int MapPutErrorResponse(int result); public: @@ -268,10 +266,14 @@ class S3fsCurl static bool InitCredentialObject(S3fsCred* pcredobj); static bool InitMimeType(const std::string& strFile); static bool DestroyS3fsCurl(); + static S3fsCurl* CreateParallelS3fsCurl(const char* tpath, int fd, off_t start, off_t size, int part_num, bool is_copy, etagpair* petag, const std::string& upload_id, int& result); static int ParallelMultipartUploadRequest(const char* tpath, headers_t& meta, int fd); static int ParallelMixMultipartUploadRequest(const char* tpath, headers_t& meta, int fd, const fdpage_list_t& mixuppages); static int ParallelGetObjectRequest(const char* tpath, int fd, off_t start, off_t size); + // lazy functions for set curl options(public) + static bool UploadMultipartPostSetCurlOpts(S3fsCurl* s3fscurl); + // class methods(variables) static std::string LookupMimeType(const std::string& name); static bool SetCheckCertificate(bool isCertCheck); @@ -357,6 +359,7 @@ class S3fsCurl int PreMultipartPostRequest(const char* tpath, headers_t& meta, std::string& upload_id, bool is_copy); int CompleteMultipartPostRequest(const char* tpath, const std::string& upload_id, etaglist_t& parts); int UploadMultipartPostRequest(const char* tpath, int part_num, const std::string& upload_id); + bool MixMultipartPostComplete(); int MultipartListRequest(std::string& body); int AbortMultipartUpload(const char* tpath, const std::string& upload_id); int MultipartHeadRequest(const char* tpath, off_t size, headers_t& meta, bool is_copy); diff --git a/src/fdcache_entity.cpp b/src/fdcache_entity.cpp index 12065c2..4ae67bd 100644 --- a/src/fdcache_entity.cpp +++ b/src/fdcache_entity.cpp @@ -43,6 +43,7 @@ static const int MAX_MULTIPART_CNT = 10 * 1000; // S3 multipart max coun // FdEntity class variables //------------------------------------------------ bool FdEntity::mixmultipart = true; +bool FdEntity::streamupload = false; //------------------------------------------------ // FdEntity class methods @@ -54,6 +55,13 @@ bool FdEntity::SetNoMixMultipart() return old; } +bool FdEntity::SetStreamUpload(bool isstream) +{ + bool old = streamupload; + streamupload = isstream; + return old; +} + int FdEntity::FillFile(int fd, unsigned char byte, off_t size, off_t start) { unsigned char bytes[1024 * 32]; // 32kb @@ -418,6 +426,13 @@ int FdEntity::Open(const headers_t* pmeta, off_t size, time_t time, int flags, A AutoLock auto_data_lock(&fdent_data_lock); + // [NOTE] + // When the file size is incremental by truncating, it must be keeped + // as an untreated area, and this area is set to these variables. + // + off_t truncated_start = 0; + off_t truncated_size = 0; + if(-1 != physical_fd){ // // already open file @@ -436,6 +451,14 @@ int FdEntity::Open(const headers_t* pmeta, off_t size, time_t time, int flags, A return -EIO; } } + + // set untreated area + if(0 <= size && size_orgmeta < size){ + // set untreated area + truncated_start = size_orgmeta; + truncated_size = size - size_orgmeta; + } + // set original headers and set size. off_t new_size = (0 <= size ? size : size_orgmeta); if(pmeta){ @@ -614,6 +637,12 @@ int FdEntity::Open(const headers_t* pmeta, off_t size, time_t time, int flags, A size_orgmeta = 0; } + // set untreated area + if(0 <= size && size_orgmeta < size){ + truncated_start = size_orgmeta; + truncated_size = size - size_orgmeta; + } + // set mtime and ctime(set "x-amz-meta-mtime" and "x-amz-meta-ctime" in orgmeta) if(-1 != time){ struct timespec ts = {time, 0}; @@ -633,6 +662,18 @@ int FdEntity::Open(const headers_t* pmeta, off_t size, time_t time, int flags, A int pseudo_fd = ppseudoinfo->GetPseudoFd(); pseudo_fd_map[pseudo_fd] = ppseudoinfo; + // if there is untreated area, set it to pseudo object. + if(0 < truncated_size){ + if(!ppseudoinfo->AddUntreated(truncated_start, truncated_size)){ + pseudo_fd_map.erase(pseudo_fd); + if(pfile){ + fclose(pfile); + pfile = NULL; + } + delete ppseudoinfo; + } + } + return pseudo_fd; } @@ -1317,6 +1358,7 @@ int FdEntity::NoCacheCompleteMultipartPost(PseudoFdInfo* pseudo_obj) s3fscurl.DestroyCurlHandle(); // clear multipart upload info + pseudo_obj->ClearUntreated(); pseudo_obj->ClearUploadInfo(false); return 0; @@ -1370,10 +1412,17 @@ int FdEntity::RowFlush(int fd, const char* tpath, bool force_sync) return 0; } + if(S3fsLog::IsS3fsLogDbg()){ + pagelist.Dump(); + } + int result; if(nomultipart){ // No multipart upload result = RowFlushNoMultipart(pseudo_obj, tpath); + }else if(FdEntity::streamupload){ + // Stream maultipart upload + result = RowFlushStreamMultipart(pseudo_obj, tpath); }else if(FdEntity::mixmultipart){ // Mix multipart upload result = RowFlushMixMultipart(pseudo_obj, tpath); @@ -1685,6 +1734,187 @@ int FdEntity::RowFlushMixMultipart(PseudoFdInfo* pseudo_obj, const char* tpath) return result; } +// [NOTE] +// Both fdent_lock and fdent_data_lock must be locked before calling. +// +int FdEntity::RowFlushStreamMultipart(PseudoFdInfo* pseudo_obj, const char* tpath) +{ + S3FS_PRN_INFO3("[tpath=%s][path=%s][pseudo_fd=%d][physical_fd=%d][mix_upload=%s]", SAFESTRPTR(tpath), path.c_str(), (pseudo_obj ? pseudo_obj->GetPseudoFd() : -1), physical_fd, (FdEntity::mixmultipart ? "true" : "false")); + + if(-1 == physical_fd || !pseudo_obj){ + return -EBADF; + } + int result; + + if(pagelist.Size() <= S3fsCurl::GetMultipartSize()){ + // + // Use normal upload instead of multipart upload(too small part size) + // + + // backup upload file size + struct stat st; + memset(&st, 0, sizeof(struct stat)); + if(-1 == fstat(physical_fd, &st)){ + S3FS_PRN_ERR("fstat is failed by errno(%d), but continue...", errno); + } + + // If there are unloaded pages, they are loaded at here. + if(0 != (result = Load(/*start=*/ 0, /*size=*/ 0, AutoLock::ALREADY_LOCKED))){ + S3FS_PRN_ERR("failed to load parts before uploading object(%d)", result); + return result; + } + + headers_t tmporgmeta = orgmeta; + S3fsCurl s3fscurl(true); + result = s3fscurl.PutRequest(path.c_str(), tmporgmeta, physical_fd); + + // reset uploaded file size + size_orgmeta = st.st_size; + + pseudo_obj->ClearUntreated(); + + if(0 == result){ + pagelist.ClearAllModified(); + } + + }else{ + // + // Make upload/download/copy/cancel lists from file + // + mp_part_list_t to_upload_list; + mp_part_list_t to_copy_list; + mp_part_list_t to_download_list; + filepart_list_t cancel_uploaded_list; + if(!pseudo_obj->ExtractUploadPartsFromAllArea(to_upload_list, to_copy_list, to_download_list, cancel_uploaded_list, S3fsCurl::GetMultipartSize(), pagelist.Size(), FdEntity::mixmultipart)){ + S3FS_PRN_ERR("Failed to extract various upload parts list from all area: errno(EIO)"); + return -EIO; + } + + // + // Check total size for downloading and Download + // + total_mp_part_list mptoal; + off_t total_download_size = mptoal(to_download_list); + if(0 < total_download_size){ + // + // Check if there is enough free disk space for the total download size + // + if(!ReserveDiskSpace(total_download_size)){ + // no enough disk space + // + // [NOTE] + // Because there is no left space size to download, we can't solve this anymore + // in this case which is uploading in sequence. + // + S3FS_PRN_WARN("Not enough local storage(%lld byte) to cache write request for whole of the file: [path=%s][physical_fd=%d]", static_cast(total_download_size), path.c_str(), physical_fd); + return -ENOSPC; // No space left on device + } + // enough disk space + + // + // Download all parts + // + // [TODO] + // Execute in parallel downloading with multiple thread. + // + for(mp_part_list_t::const_iterator download_iter = to_download_list.begin(); download_iter != to_download_list.end(); ++download_iter){ + if(0 != (result = Load(download_iter->start, download_iter->size, AutoLock::ALREADY_LOCKED))){ + break; + } + } + FdManager::FreeReservedDiskSpace(total_download_size); + if(0 != result){ + S3FS_PRN_ERR("failed to load uninitialized area before writing(errno=%d)", result); + return result; + } + } + + // + // Has multipart uploading already started? + // + if(!pseudo_obj->IsUploading()){ + // + // Multipart uploading hasn't started yet, so start it. + // + S3fsCurl s3fscurl(true); + std::string upload_id; + if(0 != (result = s3fscurl.PreMultipartPostRequest(path.c_str(), orgmeta, upload_id, true))){ + S3FS_PRN_ERR("failed to setup multipart upload(create upload id) by errno(%d)", result); + return result; + } + if(!pseudo_obj->InitialUploadInfo(upload_id)){ + S3FS_PRN_ERR("failed to setup multipart upload(set upload id to object)"); + return -EIO; + } + + // Clear the dirty flag, because the meta data is updated. + is_meta_pending = false; + } + + // + // Output debug level information + // + // When canceling(overwriting) a part that has already been uploaded, output it. + // + if(S3fsLog::IsS3fsLogDbg()){ + for(filepart_list_t::const_iterator cancel_iter = cancel_uploaded_list.begin(); cancel_iter != cancel_uploaded_list.end(); ++cancel_iter){ + S3FS_PRN_DBG("Cancel uploaded: start(%lld), size(%lld), part number(%d)", static_cast(cancel_iter->startpos), static_cast(cancel_iter->size), (cancel_iter->petag ? cancel_iter->petag->part_num : -1)); + } + } + + // + // Upload multipart and copy parts and wait exiting them + // + if(!pseudo_obj->ParallelMultipartUploadAll(path.c_str(), to_upload_list, to_copy_list, result)){ + S3FS_PRN_ERR("Failed to upload multipart parts."); + pseudo_obj->ClearUntreated(); + pseudo_obj->ClearUploadInfo(false); // clear multipart upload info + return -EIO; + } + if(0 != result){ + S3FS_PRN_ERR("An error(%d) occurred in some threads that were uploading parallel multiparts, but continue to clean up..", result); + pseudo_obj->ClearUntreated(); + pseudo_obj->ClearUploadInfo(false); // clear multipart upload info + return result; + } + + // + // Complete uploading + // + std::string upload_id; + etaglist_t etaglist; + if(!pseudo_obj->GetUploadId(upload_id) || !pseudo_obj->GetEtaglist(etaglist)){ + S3FS_PRN_ERR("There is no upload id or etag list."); + pseudo_obj->ClearUntreated(); + pseudo_obj->ClearUploadInfo(false); // clear multipart upload info + return -EIO; + }else{ + S3fsCurl s3fscurl(true); + if(0 != (result = s3fscurl.CompleteMultipartPostRequest(path.c_str(), upload_id, etaglist))){ + S3FS_PRN_ERR("failed to complete multipart upload by errno(%d)", result); + pseudo_obj->ClearUntreated(); + pseudo_obj->ClearUploadInfo(false); // clear multipart upload info + return result; + } + s3fscurl.DestroyCurlHandle(); + } + pseudo_obj->ClearUntreated(); + pseudo_obj->ClearUploadInfo(false); // clear multipart upload info + + // put pending headers + if(0 != (result = UploadPendingMeta())){ + return result; + } + } + pseudo_obj->ClearUntreated(); + + if(0 == result){ + pagelist.ClearAllModified(); + } + + return result; +} + // [NOTICE] // Need to lock before calling this method. bool FdEntity::ReserveDiskSpace(off_t size) @@ -1798,6 +2028,12 @@ ssize_t FdEntity::Write(int fd, const char* bytes, off_t start, size_t size) S3FS_PRN_ERR("failed to truncate temporary file(physical_fd=%d).", physical_fd); return -errno; } + // set untreated area + if(!pseudo_obj->AddUntreated(pagelist.Size(), (start - pagelist.Size()))){ + S3FS_PRN_ERR("failed to set untreated area by incremental."); + return -EIO; + } + // add new area pagelist.SetPageLoadedStatus(pagelist.Size(), start - pagelist.Size(), PageList::PAGE_MODIFIED); } @@ -1806,6 +2042,9 @@ ssize_t FdEntity::Write(int fd, const char* bytes, off_t start, size_t size) if(nomultipart){ // No multipart upload wsize = WriteNoMultipart(pseudo_obj, bytes, start, size); + }else if(FdEntity::streamupload){ + // Stream upload + wsize = WriteStreamUpload(pseudo_obj, bytes, start, size); }else if(FdEntity::mixmultipart){ // Mix multipart upload wsize = WriteMixMultipart(pseudo_obj, bytes, start, size); @@ -2055,6 +2294,54 @@ ssize_t FdEntity::WriteMixMultipart(PseudoFdInfo* pseudo_obj, const char* bytes, return wsize; } +// +// On Stream upload, the uploading is executed in another thread when the +// written area exceeds the maximum size of multipart upload. +// +// [NOTE] +// Both fdent_lock and fdent_data_lock must be locked before calling. +// +ssize_t FdEntity::WriteStreamUpload(PseudoFdInfo* pseudo_obj, const char* bytes, off_t start, size_t size) +{ + S3FS_PRN_DBG("[path=%s][pseudo_fd=%d][physical_fd=%d][offset=%lld][size=%zu]", path.c_str(), (pseudo_obj ? pseudo_obj->GetPseudoFd() : -1), physical_fd, static_cast(start), size); + + if(-1 == physical_fd || !pseudo_obj){ + S3FS_PRN_ERR("pseudo_fd(%d) to physical_fd(%d) for path(%s) is not opened or not writable", (pseudo_obj ? pseudo_obj->GetPseudoFd() : -1), physical_fd, path.c_str()); + return -EBADF; + } + + // Writing + ssize_t wsize; + if(-1 == (wsize = pwrite(physical_fd, bytes, size, start))){ + S3FS_PRN_ERR("pwrite failed. errno(%d)", errno); + return -errno; + } + if(0 < wsize){ + pagelist.SetPageLoadedStatus(start, wsize, PageList::PAGE_LOAD_MODIFIED); + pseudo_obj->AddUntreated(start, wsize); + } + + // Check and Upload + // + // If the last updated Untreated area exceeds the maximum upload size, + // upload processing is performed. + // + headers_t tmporgmeta = orgmeta; + bool isuploading = pseudo_obj->IsUploading(); + int result; + if(0 != (result = pseudo_obj->UploadBoundaryLastUntreatedArea(path.c_str(), tmporgmeta))){ + S3FS_PRN_ERR("Failed to upload the last untreated parts(area) : result=%d", result); + return result; + } + + if(!isuploading && pseudo_obj->IsUploading()){ + // Clear the dirty flag, because the meta data is updated. + is_meta_pending = false; + } + + return wsize; +} + // [NOTE] // Returns true if merged to orgmeta. // If true is returned, the caller can update the header. diff --git a/src/fdcache_entity.h b/src/fdcache_entity.h index bb0c727..9379fc5 100644 --- a/src/fdcache_entity.h +++ b/src/fdcache_entity.h @@ -44,6 +44,7 @@ class FdEntity }; static bool mixmultipart; // whether multipart uploading can use copy api. + static bool streamupload; // whether stream uploading. pthread_mutex_t fdent_lock; bool is_lock_init; @@ -81,13 +82,17 @@ class FdEntity int RowFlushNoMultipart(PseudoFdInfo* pseudo_obj, const char* tpath); int RowFlushMultipart(PseudoFdInfo* pseudo_obj, const char* tpath); int RowFlushMixMultipart(PseudoFdInfo* pseudo_obj, const char* tpath); + int RowFlushStreamMultipart(PseudoFdInfo* pseudo_obj, const char* tpath); ssize_t WriteNoMultipart(PseudoFdInfo* pseudo_obj, const char* bytes, off_t start, size_t size); ssize_t WriteMultipart(PseudoFdInfo* pseudo_obj, const char* bytes, off_t start, size_t size); ssize_t WriteMixMultipart(PseudoFdInfo* pseudo_obj, const char* bytes, off_t start, size_t size); + ssize_t WriteStreamUpload(PseudoFdInfo* pseudo_obj, const char* bytes, off_t start, size_t size); public: static bool GetNoMixMultipart() { return mixmultipart; } static bool SetNoMixMultipart(); + static bool GetStreamUpload() { return streamupload; } + static bool SetStreamUpload(bool isstream); explicit FdEntity(const char* tpath = NULL, const char* cpath = NULL); ~FdEntity(); diff --git a/src/fdcache_fdinfo.cpp b/src/fdcache_fdinfo.cpp index 03be568..00a717c 100644 --- a/src/fdcache_fdinfo.cpp +++ b/src/fdcache_fdinfo.cpp @@ -20,18 +20,108 @@ #include #include +#include #include +#include +#include +#include #include "common.h" #include "s3fs.h" #include "fdcache_fdinfo.h" #include "fdcache_pseudofd.h" #include "autolock.h" +#include "curl.h" +#include "string_util.h" +#include "threadpoolman.h" + +//------------------------------------------------ +// PseudoFdInfo class variables +//------------------------------------------------ +int PseudoFdInfo::max_threads = -1; +int PseudoFdInfo::opt_max_threads = -1; + +//------------------------------------------------ +// PseudoFdInfo class methods +//------------------------------------------------ +// +// Worker function for uploading +// +void* PseudoFdInfo::MultipartUploadThreadWorker(void* arg) +{ + pseudofdinfo_thparam* pthparam = static_cast(arg); + if(!pthparam || !(pthparam->ppseudofdinfo)){ + return (void*)(intptr_t)(-EIO); + } + S3FS_PRN_INFO3("Upload Part Thread [tpath=%s][start=%lld][size=%lld][part=%d]", pthparam->path.c_str(), static_cast(pthparam->start), static_cast(pthparam->size), pthparam->part_num); + + if(0 != pthparam->ppseudofdinfo->last_result){ + S3FS_PRN_DBG("Already occurred error, thus this thread worker is exiting."); + + AutoLock auto_lock(&(pthparam->ppseudofdinfo->upload_list_lock)); + + if(0 < pthparam->ppseudofdinfo->instruct_count){ + --(pthparam->ppseudofdinfo->instruct_count); + } + ++(pthparam->ppseudofdinfo->completed_count); + + return (void*)(intptr_t)(pthparam->ppseudofdinfo->last_result); + } + + + // setup and make curl object + int result = 0; + S3fsCurl* s3fscurl; + if(NULL == (s3fscurl = S3fsCurl::CreateParallelS3fsCurl(pthparam->path.c_str(), pthparam->upload_fd, pthparam->start, pthparam->size, pthparam->part_num, pthparam->is_copy, pthparam->petag, pthparam->upload_id, result))){ + S3FS_PRN_ERR("failed creating s3fs curl object for uploading [path=%s][start=%lld][size=%lld][part=%d]", pthparam->path.c_str(), static_cast(pthparam->start), static_cast(pthparam->size), pthparam->part_num); + + // set result for exiting + AutoLock auto_lock(&(pthparam->ppseudofdinfo->upload_list_lock)); + + if(0 < pthparam->ppseudofdinfo->instruct_count){ + --(pthparam->ppseudofdinfo->instruct_count); + } + ++(pthparam->ppseudofdinfo->completed_count); + + if(0 != result){ + pthparam->ppseudofdinfo->last_result = result; + } + return (void*)(intptr_t)(result); + } + + // Send request and get result + if(0 == (result = s3fscurl->RequestPerform())){ + S3FS_PRN_DBG("succeed uploading [path=%s][start=%lld][size=%lld][part=%d]", pthparam->path.c_str(), static_cast(pthparam->start), static_cast(pthparam->size), pthparam->part_num); + if(!s3fscurl->MixMultipartPostComplete()){ + S3FS_PRN_ERR("failed completion uploading [path=%s][start=%lld][size=%lld][part=%d]", pthparam->path.c_str(), static_cast(pthparam->start), static_cast(pthparam->size), pthparam->part_num); + result = -EIO; + } + }else{ + S3FS_PRN_ERR("failed uploading with error(%d) [path=%s][start=%lld][size=%lld][part=%d]", result, pthparam->path.c_str(), static_cast(pthparam->start), static_cast(pthparam->size), pthparam->part_num); + } + s3fscurl->DestroyCurlHandle(true, false); + delete s3fscurl; + + // set result + { + AutoLock auto_lock(&(pthparam->ppseudofdinfo->upload_list_lock)); + + if(0 < pthparam->ppseudofdinfo->instruct_count){ + --(pthparam->ppseudofdinfo->instruct_count); + } + ++(pthparam->ppseudofdinfo->completed_count); + + if(0 != result){ + pthparam->ppseudofdinfo->last_result = result; + } + } + return (void*)(intptr_t)(result); +} //------------------------------------------------ // PseudoFdInfo methods //------------------------------------------------ -PseudoFdInfo::PseudoFdInfo(int fd, int open_flags) : pseudo_fd(-1), physical_fd(fd), flags(0) //, is_lock_init(false) +PseudoFdInfo::PseudoFdInfo(int fd, int open_flags) : pseudo_fd(-1), physical_fd(fd), flags(0), upload_fd(-1), uploaded_sem(0), instruct_count(0), completed_count(0), last_result(0) { pthread_mutexattr_t attr; pthread_mutexattr_init(&attr); @@ -72,6 +162,47 @@ bool PseudoFdInfo::Clear() pseudo_fd = -1; physical_fd = -1; + CloseUploadFd(true); // [NOTE] already destroy mutex, then do not lock it. + + return true; +} + +void PseudoFdInfo::CloseUploadFd(bool lock_already_held) +{ + AutoLock auto_lock(&upload_list_lock, lock_already_held ? AutoLock::ALREADY_LOCKED : AutoLock::NONE); + + if(-1 != upload_fd){ + close(upload_fd); + } +} + +bool PseudoFdInfo::OpenUploadFd(bool lock_already_held) +{ + AutoLock auto_lock(&upload_list_lock, lock_already_held ? AutoLock::ALREADY_LOCKED : AutoLock::NONE); + + if(-1 != upload_fd){ + // already initialized + return true; + } + if(-1 == physical_fd){ + S3FS_PRN_ERR("physical_fd is not initialized yet."); + return false; + } + + // duplicate fd + if(-1 == (upload_fd = dup(physical_fd)) || 0 != lseek(upload_fd, 0, SEEK_SET)){ + S3FS_PRN_ERR("Could not duplicate physical file descriptor(errno=%d)", errno); + if(-1 != upload_fd){ + close(upload_fd); + } + return false; + } + struct stat st; + if(-1 == fstat(upload_fd, &st)){ + S3FS_PRN_ERR("Invalid file descriptor for uploading(errno=%d)", errno); + close(upload_fd); + return false; + } return true; } @@ -114,23 +245,23 @@ bool PseudoFdInfo::ClearUploadInfo(bool is_cancel_mp, bool lock_already_held) if(is_cancel_mp){ // [TODO] - // If we have any uploaded parts, we should delete them here. - // We haven't implemented it yet, but it will be implemented in the future. - // (User can delete them in the utility mode of s3fs.) + // If processing for cancellation is required, it will be processed here. Not implemented yet. // - S3FS_PRN_INFO("Implementation of cancellation process for multipart upload is awaited."); + S3FS_PRN_DBG("If processing for cancellation is required, it will be processed here."); } upload_id.erase(); upload_list.clear(); - ClearUntreated(true); + instruct_count = 0; + completed_count = 0; + last_result = 0; return true; } -bool PseudoFdInfo::InitialUploadInfo(const std::string& id) +bool PseudoFdInfo::InitialUploadInfo(const std::string& id, bool lock_already_held) { - AutoLock auto_lock(&upload_list_lock); + AutoLock auto_lock(&upload_list_lock, lock_already_held ? AutoLock::ALREADY_LOCKED : AutoLock::NONE); if(!ClearUploadInfo(true, true)){ return false; @@ -225,13 +356,6 @@ bool PseudoFdInfo::ClearUntreated(off_t start, off_t size) return untreated_list.ClearParts(start, size); } -bool PseudoFdInfo::GetUntreated(off_t& start, off_t& size, off_t max_size, off_t min_size) -{ - AutoLock auto_lock(&upload_list_lock); - - return untreated_list.GetPart(start, size, max_size, min_size); -} - bool PseudoFdInfo::GetLastUntreated(off_t& start, off_t& size, off_t max_size, off_t min_size) { AutoLock auto_lock(&upload_list_lock); @@ -243,7 +367,684 @@ bool PseudoFdInfo::AddUntreated(off_t start, off_t size) { AutoLock auto_lock(&upload_list_lock); - return untreated_list.AddPart(start, size); + bool result = untreated_list.AddPart(start, size); + if(!result){ + S3FS_PRN_DBG("Failed adding untreated area part."); + }else if(S3fsLog::IsS3fsLogDbg()){ + untreated_list.Dump(); + } + + return result; +} + +bool PseudoFdInfo::GetLastUpdateUntreatedPart(off_t& start, off_t& size) +{ + // Get last untreated area + if(!untreated_list.GetLastUpdatePart(start, size)){ + return false; + } + return true; +} + +bool PseudoFdInfo::ReplaceLastUpdateUntreatedPart(off_t front_start, off_t front_size, off_t behind_start, off_t behind_size) +{ + if(0 < front_size){ + if(!untreated_list.ReplaceLastUpdatePart(front_start, front_size)){ + return false; + } + }else{ + if(!untreated_list.RemoveLastUpdatePart()){ + return false; + } + } + if(0 < behind_size){ + if(!untreated_list.AddPart(behind_start, behind_size)){ + return false; + } + } + return true; +} + +// +// Utility for sorting upload list +// +static bool filepart_partnum_compare(const filepart& src1, const filepart& src2) +{ + return (src1.get_part_number() <= src2.get_part_number()); +} + +bool PseudoFdInfo::InsertUploadPart(off_t start, off_t size, int part_num, bool is_copy, etagpair** ppetag, bool lock_already_held) +{ + //S3FS_PRN_DBG("[start=%lld][size=%lld][part_num=%d][is_copy=%s]", static_cast(start), static_cast(size), part_num, (is_copy ? "true" : "false")); + + if(!IsUploading()){ + S3FS_PRN_ERR("Multipart Upload has not started yet."); + return false; + } + if(start < 0 || size <= 0 || part_num < 0 || !ppetag){ + S3FS_PRN_ERR("Parameters are wrong."); + return false; + } + + AutoLock auto_lock(&upload_list_lock, lock_already_held ? AutoLock::ALREADY_LOCKED : AutoLock::NONE); + + // insert new part + etag_entities.push_back(etagpair(NULL, part_num)); + etagpair& etag_entity = etag_entities.back(); + filepart newpart(false, physical_fd, start, size, is_copy, &etag_entity); + upload_list.push_back(newpart); + + // sort by part number + upload_list.sort(filepart_partnum_compare); + + // set etag pointer + *ppetag = &etag_entity; + + return true; +} + +// [NOTE] +// This method only launches the upload thread. +// Check the maximum number of threads before calling. +// +bool PseudoFdInfo::ParallelMultipartUpload(const char* path, const mp_part_list_t& mplist, bool is_copy) +{ + //S3FS_PRN_DBG("[path=%s][mplist(%zu)]", SAFESTRPTR(path), mplist.size()); + + if(mplist.empty()){ + // nothing to do + return true; + } + if(!OpenUploadFd(true)){ + return false; + } + + for(mp_part_list_t::const_iterator iter = mplist.begin(); iter != mplist.end(); ++iter){ + // Insert upload part + etagpair* petag = NULL; + if(!InsertUploadPart(iter->start, iter->size, iter->part_num, is_copy, &petag, true)){ + S3FS_PRN_ERR("Failed to insert insert upload part(path=%s, start=%lld, size=%lld, part=%d, copy=%s) to mplist", SAFESTRPTR(path), static_cast(iter->start), static_cast(iter->size), iter->part_num, (is_copy ? "true" : "false")); + return false; + } + + // make parameter for my thread + pseudofdinfo_thparam* thargs = new pseudofdinfo_thparam; + thargs->ppseudofdinfo = this; + thargs->path = SAFESTRPTR(path); + thargs->upload_id = upload_id; + thargs->upload_fd = upload_fd; + thargs->start = iter->start; + thargs->size = iter->size; + thargs->is_copy = is_copy; + thargs->part_num = iter->part_num; + thargs->petag = petag; + + // make parameter for thread pool + thpoolman_param* ppoolparam = new thpoolman_param; + ppoolparam->args = thargs; + ppoolparam->psem = &uploaded_sem; + ppoolparam->pfunc = PseudoFdInfo::MultipartUploadThreadWorker; + + // setup instruction + if(!ThreadPoolMan::Instruct(ppoolparam)){ + S3FS_PRN_ERR("failed setup instruction for uploading."); + delete ppoolparam; + delete thargs; + return false; + } + ++instruct_count; + } + return true; +} + +bool PseudoFdInfo::ParallelMultipartUploadAll(const char* path, const mp_part_list_t& upload_list, const mp_part_list_t& copy_list, int& result) +{ + S3FS_PRN_DBG("[path=%s][upload_list(%zu)][copy_list(%zu)]", SAFESTRPTR(path), upload_list.size(), copy_list.size()); + + result = 0; + + if(!OpenUploadFd(true)){ + return false; + } + + if(!ParallelMultipartUpload(path, upload_list, false) || !ParallelMultipartUpload(path, copy_list, true)){ + S3FS_PRN_ERR("Failed setup instruction for uploading(path=%s, upload_list=%zu, copy_list=%zu).", SAFESTRPTR(path), upload_list.size(), copy_list.size()); + return false; + } + + // Wait for all thread exiting + result = WaitAllThreadsExit(); + + return true; +} + +// +// Upload the last updated Untreated area +// +// [Overview] +// Uploads untreated areas with the maximum multipart upload size as the +// boundary. +// +// * The starting position of the untreated area is aligned with the maximum +// multipart upload size as the boundary. +// * If there is an uploaded area that overlaps with the aligned untreated +// area, that uploaded area is canceled and absorbed by the untreated area. +// * Upload only when the aligned untreated area exceeds the maximum multipart +// upload size. +// * When the start position of the untreated area is changed to boundary +// alignment(to backward), and if that gap area is remained, that area is +// rest to untreated area. +// +ssize_t PseudoFdInfo::UploadBoundaryLastUntreatedArea(const char* path, headers_t& meta) +{ + S3FS_PRN_DBG("[path=%s][pseudo_fd=%d][physical_fd=%d]", SAFESTRPTR(path), pseudo_fd, physical_fd); + + if(!path || -1 == physical_fd || -1 == pseudo_fd){ + S3FS_PRN_ERR("pseudo_fd(%d) to physical_fd(%d) for path(%s) is not opened or not writable", pseudo_fd, physical_fd, path); + return -EBADF; + } + AutoLock auto_lock(&upload_list_lock); + + // + // Get last update untreated area + // + off_t last_untreated_start = 0; + off_t last_untreated_size = 0; + if(!GetLastUpdateUntreatedPart(last_untreated_start, last_untreated_size) || last_untreated_start < 0 || last_untreated_size <= 0){ + S3FS_PRN_WARN("Not found last update untreated area or it is empty, thus return without any error."); + return 0; + } + + // + // Aligns the start position of the last updated raw area with the boundary + // + // * Align the last updated raw space with the maximum upload size boundary. + // * The remaining size of the part before the boundary is will not be uploaded. + // + off_t max_mp_size = S3fsCurl::GetMultipartSize(); + off_t aligned_start = ((last_untreated_start / max_mp_size) + (0 < (last_untreated_start % max_mp_size) ? 1 : 0)) * max_mp_size; + if((last_untreated_start + last_untreated_size) <= aligned_start){ + S3FS_PRN_INFO("After the untreated area(start=%lld, size=%lld) is aligned with the boundary, the aligned start(%lld) exceeds the untreated area, so there is nothing to do.", static_cast(last_untreated_start), static_cast(last_untreated_size), static_cast(aligned_start)); + return 0; + } + + off_t aligned_size = (((last_untreated_start + last_untreated_size) - aligned_start) / max_mp_size) * max_mp_size; + if(0 == aligned_size){ + S3FS_PRN_DBG("After the untreated area(start=%lld, size=%lld) is aligned with the boundary(start is %lld), the aligned size is empty, so nothing to do.", static_cast(last_untreated_start), static_cast(last_untreated_size), static_cast(aligned_start)); + return 0; + } + + off_t front_rem_start = last_untreated_start; // start of the remainder untreated area in front of the boundary + off_t front_rem_size = aligned_start - last_untreated_start; // size of the remainder untreated area in front of the boundary + + // + // Get the area for uploading, if last update treated area can be uploaded. + // + // [NOTE] + // * Create the updoad area list, if the untreated area aligned with the boundary + // exceeds the maximum upload size. + // * If it overlaps with an area that has already been uploaded(unloaded list), + // that area is added to the cancellation list and included in the untreated area. + // + mp_part_list_t to_upload_list; + filepart_list_t cancel_uploaded_list; + if(!ExtractUploadPartsFromUntreatedArea(aligned_start, aligned_size, to_upload_list, cancel_uploaded_list, S3fsCurl::GetMultipartSize())){ + S3FS_PRN_ERR("Failed to extract upload parts from last untreated area."); + return -EIO; + } + if(to_upload_list.empty()){ + S3FS_PRN_INFO("There is nothing to upload. In most cases, the untreated area does not meet the upload size."); + return 0; + } + + // + // Has multipart uploading already started? + // + if(!IsUploading()){ + // Multipart uploading hasn't started yet, so start it. + // + S3fsCurl s3fscurl(true); + std::string upload_id; + int result; + if(0 != (result = s3fscurl.PreMultipartPostRequest(path, meta, upload_id, true))){ + S3FS_PRN_ERR("failed to setup multipart upload(create upload id) by errno(%d)", result); + return result; + } + if(!InitialUploadInfo(upload_id, true)){ + S3FS_PRN_ERR("failed to setup multipart upload(set upload id to object)"); + return result; + } + } + + // + // Output debug level information + // + // When canceling(overwriting) a part that has already been uploaded, output it. + // + if(S3fsLog::IsS3fsLogDbg()){ + for(filepart_list_t::const_iterator cancel_iter = cancel_uploaded_list.begin(); cancel_iter != cancel_uploaded_list.end(); ++cancel_iter){ + S3FS_PRN_DBG("Cancel uploaded: start(%lld), size(%lld), part number(%d)", static_cast(cancel_iter->startpos), static_cast(cancel_iter->size), (cancel_iter->petag ? cancel_iter->petag->part_num : -1)); + } + } + + // + // Upload Multipart parts + // + if(!ParallelMultipartUpload(path, to_upload_list, false)){ + S3FS_PRN_ERR("Failed to upload multipart parts."); + return -EIO; + } + + // + // Exclude the uploaded Untreated area and update the last Untreated area. + // + off_t behind_rem_start = aligned_start + aligned_size; + off_t behind_rem_size = (last_untreated_start + last_untreated_size) - behind_rem_start; + + if(!ReplaceLastUpdateUntreatedPart(front_rem_start, front_rem_size, behind_rem_start, behind_rem_size)){ + S3FS_PRN_WARN("The last untreated area could not be detected and the uploaded area could not be excluded from it, but continue because it does not affect the overall processing."); + } + + return 0; +} + +int PseudoFdInfo::WaitAllThreadsExit() +{ + while(true){ + { + AutoLock auto_lock(&upload_list_lock); + if(0 == instruct_count && 0 == completed_count){ + break; + } + + while(uploaded_sem.try_wait()){ + if(0 < completed_count){ + --completed_count; + } + } + if(0 == instruct_count && 0 == completed_count){ + break; + } + } + + // need to wait the worker exiting + uploaded_sem.wait(); + { + AutoLock auto_lock(&upload_list_lock); + if(0 < completed_count){ + --completed_count; + } + } + } + return last_result; +} + +// +// Extract the list for multipart upload from the Unteated Area +// +// The untreated_start parameter must be set aligning it with the boundaries +// of the maximum multipart upload size. This method expects it to be bounded. +// +// This method creates the upload area aligned from the untreated area by +// maximum size and creates the required list. +// If it overlaps with an area that has already been uploaded, the overlapped +// upload area will be canceled and absorbed by the untreated area. +// If the list creation process is complete and areas smaller than the maximum +// size remain, those area will be reset to untreated_start and untreated_size +// and returned to the caller. +// If the called untreated area is smaller than the maximum size of the +// multipart upload, no list will be created. +// +// [NOTE] +// Maximum multipart upload size must be uploading boundary. +// +bool PseudoFdInfo::ExtractUploadPartsFromUntreatedArea(off_t& untreated_start, off_t& untreated_size, mp_part_list_t& to_upload_list, filepart_list_t& cancel_upload_list, off_t max_mp_size) +{ + if(untreated_start < 0 || untreated_size <= 0){ + S3FS_PRN_ERR("Paramters are wrong(untreated_start=%lld, untreated_size=%lld).", static_cast(untreated_start), static_cast(untreated_size)); + return false; + } + + // Initiliaze lists + to_upload_list.clear(); + cancel_upload_list.clear(); + + // + // Align start position with maximum multipart upload boundaries + // + off_t aligned_start = (untreated_start / max_mp_size) * max_mp_size; + off_t aligned_size = untreated_size + (untreated_start - aligned_start); + + // + // Check aligned untreated size + // + if(aligned_size < max_mp_size){ + S3FS_PRN_INFO("untreated area(start=%lld, size=%lld) to aligned boundary(start=%lld, size=%lld) is smaller than max mp size(%lld), so nothing to do.", static_cast(untreated_start), static_cast(untreated_size), static_cast(aligned_start), static_cast(aligned_size), static_cast(max_mp_size)); + return true; // successful termination + } + + // + // Check each unloaded area in list + // + // [NOTE] + // The uploaded area must be to be aligned by boundary. + // Also, it is assumed that it must not be a copy area. + // So if the areas overlap, include uploaded area as an untreated area. + // + for(filepart_list_t::iterator cur_iter = upload_list.begin(); cur_iter != upload_list.end(); /* ++cur_iter */){ + // Check overlap + if((cur_iter->startpos + cur_iter->size - 1) < aligned_start || (aligned_start + aligned_size - 1) < cur_iter->startpos){ + // Areas do not overlap + ++cur_iter; + + }else{ + // The areas overlap + // + // Since the start position of the uploaded area is aligned with the boundary, + // it is not necessary to check the start position. + // If the uploaded area exceeds the untreated area, expand the untreated area. + // + if((aligned_start + aligned_size - 1) < (cur_iter->startpos + cur_iter->size - 1)){ + aligned_size += (cur_iter->startpos + cur_iter->size) - (aligned_start + aligned_size); + } + + // + // Add this to cancel list + // + cancel_upload_list.push_back(*cur_iter); // Copy and Push to cancel list + cur_iter = upload_list.erase(cur_iter); + } + } + + // + // Add upload area to the list + // + while(max_mp_size <= aligned_size){ + int part_num = (aligned_start / max_mp_size) + 1; + to_upload_list.push_back(mp_part(aligned_start, max_mp_size, part_num)); + + aligned_start += max_mp_size; + aligned_size -= max_mp_size; + } + + return true; +} + +// +// Extract the area lists to be uploaded/downloaded for the entire file. +// +// [Parameters] +// to_upload_list : A list of areas to upload in multipart upload. +// to_copy_list : A list of areas for copy upload in multipart upload. +// to_download_list : A list of areas that must be downloaded before multipart upload. +// cancel_upload_list : A list of areas that have already been uploaded and will be canceled(overwritten). +// file_size : The size of the upload file. +// use_copy : Specify true if copy multipart upload is available. +// +// [NOTE] +// The untreated_list does not change, but upload_list is changed. +// (If you want to restore it, you can use cancel_upload_list.) +// +bool PseudoFdInfo::ExtractUploadPartsFromAllArea(mp_part_list_t& to_upload_list, mp_part_list_t& to_copy_list, mp_part_list_t& to_download_list, filepart_list_t& cancel_upload_list, off_t max_mp_size, off_t file_size, bool use_copy) +{ + AutoLock auto_lock(&upload_list_lock); + + // Initiliaze lists + to_upload_list.clear(); + to_copy_list.clear(); + to_download_list.clear(); + cancel_upload_list.clear(); + + // Duplicate untreated list + untreated_list_t dup_untreated_list; + untreated_list.Duplicate(dup_untreated_list); + + // Initialize the iterator of each list first + untreated_list_t::iterator dup_untreated_iter = dup_untreated_list.begin(); + filepart_list_t::iterator uploaded_iter = upload_list.begin(); + + // + // Loop to extract areas to upload and download + // + // Check at the boundary of the maximum upload size from the beginning of the file + // + for(off_t cur_start = 0, cur_size = 0; cur_start < file_size; cur_start += cur_size){ + // + // Set part size + // (To avoid confusion, the area to be checked is called the "current area".) + // + cur_size = ((cur_start + max_mp_size) <= file_size ? max_mp_size : (file_size - cur_start)); + + // + // Extract the untreated erea that overlaps this current area. + // (The extracted area is deleted from dup_untreated_list.) + // + untreated_list_t cur_untreated_list; + for(cur_untreated_list.clear(); dup_untreated_iter != dup_untreated_list.end(); ){ + if((dup_untreated_iter->start < (cur_start + cur_size)) && (cur_start < (dup_untreated_iter->start + dup_untreated_iter->size))){ + // this untreated area is overlap + off_t tmp_untreated_start; + off_t tmp_untreated_size; + if(dup_untreated_iter->start < cur_start){ + // [NOTE] + // This untreated area overlaps with the current area, but starts + // in front of the target area. + // This state should not be possible, but if this state is detected, + // the part before the target area will be deleted. + // + tmp_untreated_start = cur_start; + tmp_untreated_size = dup_untreated_iter->size - (cur_start - dup_untreated_iter->start); + }else{ + tmp_untreated_start = dup_untreated_iter->start; + tmp_untreated_size = dup_untreated_iter->size; + } + + // + // Check the end of the overlapping untreated area. + // + if((tmp_untreated_start + tmp_untreated_size) <= (cur_start + cur_size)){ + // + // All of untreated areas are within the current area + // + // - Add this untreated area to cur_untreated_list + // - Delete this from dup_untreated_list + // + cur_untreated_list.push_back(untreatedpart(tmp_untreated_start, tmp_untreated_size)); + dup_untreated_iter = dup_untreated_list.erase(dup_untreated_iter); + }else{ + // + // The untreated area exceeds the end of the current area + // + + // Ajust untreated area + tmp_untreated_size = (cur_start + cur_size) - tmp_untreated_start; + + // Add ajusted untreated area to cur_untreated_list + cur_untreated_list.push_back(untreatedpart(tmp_untreated_start, tmp_untreated_size)); + + // Remove this ajusted untreated area from the area pointed + // to by dup_untreated_iter. + dup_untreated_iter->size = (dup_untreated_iter->start + dup_untreated_iter->size) - (cur_start + cur_size); + dup_untreated_iter->start = tmp_untreated_start + tmp_untreated_size; + } + + }else if((cur_start + cur_size - 1) < dup_untreated_iter->start){ + // this untreated area is over the current area, thus break loop. + break; + }else{ + ++dup_untreated_iter; + } + } + + // + // Check uploaded area + // + // [NOTE] + // The uploaded area should be aligned with the maximum upload size boundary. + // It also assumes that each size of uploaded area must be a maximum upload + // size. + // + filepart_list_t::iterator overlap_uploaded_iter = upload_list.end(); + for(; uploaded_iter != upload_list.end(); ++uploaded_iter){ + if((cur_start < (uploaded_iter->startpos + uploaded_iter->size)) && (uploaded_iter->startpos < (cur_start + cur_size))){ + if(overlap_uploaded_iter != upload_list.end()){ + // + // Something wrong in this unloaded area. + // + // This area is not aligned with the boundary, then this condition + // is unrecoverable and return failure. + // + S3FS_PRN_ERR("The uploaded list may not be the boundary for the maximum multipart upload size. No further processing is possible."); + return false; + } + // Set this iterator to ovrelap iter + overlap_uploaded_iter = uploaded_iter; + + }else if((cur_start + cur_size - 1) < uploaded_iter->startpos){ + break; + } + } + + // + // Create upload/download/cancel/copy list for this current area + // + int part_num = (cur_start / max_mp_size) + 1; + if(cur_untreated_list.empty()){ + // + // No untreated area was detected in this current area + // + if(overlap_uploaded_iter != upload_list.end()){ + // + // This current area already uploaded, then nothing to add to lists. + // + S3FS_PRN_DBG("Already uploaded: start=%lld, size=%lld", static_cast(cur_start), static_cast(cur_size)); + + }else{ + // + // This current area has not been uploaded + // (neither an uploaded area nor an untreated area.) + // + if(use_copy){ + // + // Copy multipart upload available + // + S3FS_PRN_DBG("To copy: start=%lld, size=%lld", static_cast(cur_start), static_cast(cur_size)); + to_copy_list.push_back(mp_part(cur_start, cur_size, part_num)); + }else{ + // + // This current area needs to be downloaded and uploaded + // + S3FS_PRN_DBG("To download and upload: start=%lld, size=%lld", static_cast(cur_start), static_cast(cur_size)); + to_download_list.push_back(mp_part(cur_start, cur_size)); + to_upload_list.push_back(mp_part(cur_start, cur_size, part_num)); + } + } + }else{ + // + // Found untreated area in this current area + // + if(overlap_uploaded_iter != upload_list.end()){ + // + // This current area is also the uploaded area + // + // [NOTE] + // The uploaded area is aligned with boundary, there are all data in + // this current area locally(which includes all data of untreated area). + // So this current area only needs to be uploaded again. + // + S3FS_PRN_DBG("Cancel upload: start=%lld, size=%lld", static_cast(overlap_uploaded_iter->startpos), static_cast(overlap_uploaded_iter->size)); + cancel_upload_list.push_back(*overlap_uploaded_iter); // add this uploaded area to cancel_upload_list + upload_list.erase(overlap_uploaded_iter); // remove it from upload_list + + S3FS_PRN_DBG("To upload: start=%lld, size=%lld", static_cast(cur_start), static_cast(cur_size)); + to_upload_list.push_back(mp_part(cur_start, cur_size, part_num)); // add new uploading area to list + + }else{ + // + // No uploaded area overlap this current area + // (Areas other than the untreated area must be downloaded.) + // + // [NOTE] + // Need to consider the case where there is a gap between the start + // of the current area and the untreated area. + // This gap is the area that should normally be downloaded. + // But it is the area that can be copied if we can use copy multipart + // upload. Then If we can use copy multipart upload and the previous + // area is used copy multipart upload, this gap will be absorbed by + // the previous area. + // Unifying the copy multipart upload area can reduce the number of + // upload requests. + // + off_t tmp_cur_start = cur_start; + off_t tmp_cur_size = cur_size; + off_t changed_start = cur_start; + off_t changed_size = cur_size; + bool first_area = true; + for(untreated_list_t::const_iterator tmp_cur_untreated_iter = cur_untreated_list.begin(); tmp_cur_untreated_iter != cur_untreated_list.end(); ++tmp_cur_untreated_iter, first_area = false){ + if(tmp_cur_start < tmp_cur_untreated_iter->start){ + // + // Detected a gap at the start of area + // + bool include_prev_copy_part = false; + if(first_area && use_copy && !to_copy_list.empty()){ + // + // Make sure that the area of the last item in to_copy_list + // is contiguous with this current area. + // + // [NOTE] + // Areas can be unified if the total size of the areas is + // within 5GB and the remaining area after unification is + // larger than the minimum multipart upload size. + // + mp_part_list_t::reverse_iterator copy_riter = to_copy_list.rbegin(); + + if( (copy_riter->start + copy_riter->size) == tmp_cur_start && + (copy_riter->size + (tmp_cur_untreated_iter->start - tmp_cur_start)) <= FIVE_GB && + ((tmp_cur_start + tmp_cur_size) - (tmp_cur_untreated_iter->start - tmp_cur_start)) >= MIN_MULTIPART_SIZE ) + { + // + // Unify to this area to previouse copy area. + // + copy_riter->size += tmp_cur_untreated_iter->start - tmp_cur_start; + S3FS_PRN_DBG("Resize to copy: start=%lld, size=%lld", static_cast(copy_riter->start), static_cast(copy_riter->size)); + + changed_size -= (tmp_cur_untreated_iter->start - changed_start); + changed_start = tmp_cur_untreated_iter->start; + include_prev_copy_part = true; + } + } + if(!include_prev_copy_part){ + // + // If this area is not unified, need to download this area + // + S3FS_PRN_DBG("To download: start=%lld, size=%lld", static_cast(tmp_cur_start), static_cast(tmp_cur_untreated_iter->start - tmp_cur_start)); + to_download_list.push_back(mp_part(tmp_cur_start, tmp_cur_untreated_iter->start - tmp_cur_start)); + } + } + // + // Set next start position + // + tmp_cur_size = (tmp_cur_start + tmp_cur_size) - (tmp_cur_untreated_iter->start + tmp_cur_untreated_iter->size); + tmp_cur_start = tmp_cur_untreated_iter->start + tmp_cur_untreated_iter->size; + } + + // + // Add download area to list, if remaining size + // + if(0 < tmp_cur_size){ + S3FS_PRN_DBG("To download: start=%lld, size=%lld", static_cast(tmp_cur_start), static_cast(tmp_cur_size)); + to_download_list.push_back(mp_part(tmp_cur_start, tmp_cur_size)); + } + + // + // Set upload area(whole of area) to list + // + S3FS_PRN_DBG("To upload: start=%lld, size=%lld", static_cast(changed_start), static_cast(changed_size)); + to_upload_list.push_back(mp_part(changed_start, changed_size, part_num)); + } + } + } + return true; } /* diff --git a/src/fdcache_fdinfo.h b/src/fdcache_fdinfo.h index a708e9f..86b064d 100644 --- a/src/fdcache_fdinfo.h +++ b/src/fdcache_fdinfo.h @@ -22,6 +22,28 @@ #define S3FS_FDCACHE_FDINFO_H_ #include "fdcache_untreated.h" +#include "psemaphore.h" +#include "metaheader.h" + +//------------------------------------------------ +// Structure of parameters to pass to thread +//------------------------------------------------ +class PseudoFdInfo; + +struct pseudofdinfo_thparam +{ + PseudoFdInfo* ppseudofdinfo; + std::string path; + std::string upload_id; + int upload_fd; + off_t start; + off_t size; + bool is_copy; + int part_num; + etagpair* petag; + + pseudofdinfo_thparam() : ppseudofdinfo(NULL), path(""), upload_id(""), upload_fd(-1), start(0), size(0), is_copy(false), part_num(-1), petag(NULL) {} +}; //------------------------------------------------ // Class PseudoFdInfo @@ -29,19 +51,36 @@ class PseudoFdInfo { private: - int pseudo_fd; - int physical_fd; - int flags; // flags at open - std::string upload_id; - filepart_list_t upload_list; - UntreatedParts untreated_list; // list of untreated parts that have been written and not yet uploaded(for streamupload) - etaglist_t etag_entities; // list of etag string and part number entities(to maintain the etag entity even if MPPART_INFO is destroyed) + static int max_threads; + static int opt_max_threads; // for option value - bool is_lock_init; - pthread_mutex_t upload_list_lock; // protects upload_id and upload_list + int pseudo_fd; + int physical_fd; + int flags; // flags at open + std::string upload_id; + int upload_fd; // duplicated fd for uploading + filepart_list_t upload_list; + UntreatedParts untreated_list; // list of untreated parts that have been written and not yet uploaded(for streamupload) + etaglist_t etag_entities; // list of etag string and part number entities(to maintain the etag entity even if MPPART_INFO is destroyed) + bool is_lock_init; + pthread_mutex_t upload_list_lock; // protects upload_id and upload_list + Semaphore uploaded_sem; // use a semaphore to trigger an upload completion like event flag + volatile int instruct_count; // number of instructions for processing by threads + volatile int completed_count; // number of completed processes by thread + int last_result; // the result of thread processing private: + static void* MultipartUploadThreadWorker(void* arg); + bool Clear(); + void CloseUploadFd(bool lock_already_held = false); + bool OpenUploadFd(bool lock_already_held = false); + bool GetLastUpdateUntreatedPart(off_t& start, off_t& size); + bool ReplaceLastUpdateUntreatedPart(off_t front_start, off_t front_size, off_t behind_start, off_t behind_size); + bool ParallelMultipartUpload(const char* path, const mp_part_list_t& mplist, bool is_copy); + bool InsertUploadPart(off_t start, off_t size, int part_num, bool is_copy, etagpair** ppetag, bool lock_already_held = false); + int WaitAllThreadsExit(); + bool ExtractUploadPartsFromUntreatedArea(off_t& untreated_start, off_t& untreated_size, mp_part_list_t& to_upload_list, filepart_list_t& cancel_upload_list, off_t max_mp_size); public: PseudoFdInfo(int fd = -1, int open_flags = 0); @@ -55,7 +94,7 @@ class PseudoFdInfo bool Set(int fd, int open_flags); bool ClearUploadInfo(bool is_clear_part = false, bool lock_already_held = false); - bool InitialUploadInfo(const std::string& id); + bool InitialUploadInfo(const std::string& id, bool lock_already_held = false); bool IsUploading() const { return !upload_id.empty(); } bool GetUploadId(std::string& id) const; @@ -65,9 +104,12 @@ class PseudoFdInfo void ClearUntreated(bool lock_already_held = false); bool ClearUntreated(off_t start, off_t size); - bool GetUntreated(off_t& start, off_t& size, off_t max_size, off_t min_size = MIN_MULTIPART_SIZE); bool GetLastUntreated(off_t& start, off_t& size, off_t max_size, off_t min_size = MIN_MULTIPART_SIZE); bool AddUntreated(off_t start, off_t size); + + bool ParallelMultipartUploadAll(const char* path, const mp_part_list_t& upload_list, const mp_part_list_t& copy_list, int& result); + ssize_t UploadBoundaryLastUntreatedArea(const char* path, headers_t& meta); + bool ExtractUploadPartsFromAllArea(mp_part_list_t& to_upload_list, mp_part_list_t& to_copy_list, mp_part_list_t& to_download_list, filepart_list_t& cancel_upload_list, off_t max_mp_size, off_t file_size, bool use_copy); }; typedef std::map fdinfo_map_t; diff --git a/src/fdcache_untreated.cpp b/src/fdcache_untreated.cpp index 78ace1b..87bbd2f 100644 --- a/src/fdcache_untreated.cpp +++ b/src/fdcache_untreated.cpp @@ -91,8 +91,9 @@ bool UntreatedParts::AddPart(off_t start, off_t size) return true; }else if((start + size) < iter->start){ - // The part to add should be inserted before the current part. + // The part to add should be inserted before the current part. untreated_list.insert(iter, untreatedpart(start, size, last_tag)); + // success to stretch and compress existed parts return true; } } @@ -133,90 +134,6 @@ bool UntreatedParts::RowGetPart(off_t& start, off_t& size, off_t max_size, off_t return false; } -// [NOTE] -// The part with the last tag cannot be taken out if it has not reached max_size. -// -bool UntreatedParts::TakeoutPart(off_t& start, off_t& size, off_t max_size, off_t min_size) -{ - if(max_size <= 0 || min_size < 0 || max_size < min_size){ - S3FS_PRN_ERR("Paramter are wrong(max_size=%lld, min_size=%lld).", static_cast(max_size), static_cast(min_size)); - return false; - } - AutoLock auto_lock(&untreated_list_lock); - - // Check the overlap with the existing part and add the part. - for(untreated_list_t::iterator iter = untreated_list.begin(); iter != untreated_list.end(); ++iter){ - if(iter->untreated_tag == last_tag){ - // Last updated part - if(max_size <= iter->size){ - // Take out only when the maximum part size is exceeded - start = iter->start; - size = max_size; - iter->start = iter->start + max_size; - iter->size = iter->size - max_size; - - if(iter->size == 0){ - untreated_list.erase(iter); - } - return true; - } - }else{ - // Parts updated in the past - if(min_size <= iter->size){ - if(iter->size <= max_size){ - // Take out the whole part( min <= part size <= max ) - start = iter->start; - size = iter->size; - untreated_list.erase(iter); - }else{ - // Partially take out part( max < part size ) - start = iter->start; - size = max_size; - iter->start = iter->start + max_size; - iter->size = iter->size - max_size; - } - return true; - } - } - } - return false; -} - -// [NOTE] -// This method returns the part from the beginning, ignoring conditions -// such as whether it is being updated(the last updated part) or less -// than the minimum size. -// -bool UntreatedParts::TakeoutPartFromBegin(off_t& start, off_t& size, off_t max_size) -{ - if(max_size <= 0){ - S3FS_PRN_ERR("Paramter is wrong(max_size=%lld).", static_cast(max_size)); - return false; - } - AutoLock auto_lock(&untreated_list_lock); - - if(untreated_list.empty()){ - return false; - } - - untreated_list_t::iterator iter = untreated_list.begin(); - if(iter->size <= max_size){ - // Take out the whole part( part size <= max ) - start = iter->start; - size = iter->size; - - untreated_list.erase(iter); - }else{ - // Take out only when the maximum part size is exceeded - start = iter->start; - size = max_size; - - iter->start = iter->start + max_size; - iter->size = iter->size - max_size; - } - return true; -} - // [NOTE] // If size is specified as 0, all areas(parts) after start will be deleted. // @@ -251,7 +168,7 @@ bool UntreatedParts::ClearParts(off_t start, off_t size) } }else if(start < (iter->start + iter->size)){ // clear area overlaps with iter area(on the end side) - if(0 == size || (iter->start + iter->size) <= (start + size) ){ + if(0 == size || (iter->start + iter->size) <= (start + size)){ // start to iter->end is clear iter->size = start - iter->start; }else{ @@ -274,6 +191,85 @@ bool UntreatedParts::ClearParts(off_t start, off_t size) return true; } +// +// Update the last updated Untreated part +// +bool UntreatedParts::GetLastUpdatePart(off_t& start, off_t& size) +{ + AutoLock auto_lock(&untreated_list_lock); + + for(untreated_list_t::const_iterator iter = untreated_list.begin(); iter != untreated_list.end(); ++iter){ + if(iter->untreated_tag == last_tag){ + start = iter->start; + size = iter->size; + return true; + } + } + return false; +} + +// +// Replaces the last updated Untreated part. +// +// [NOTE] +// If size <= 0, delete that part +// +bool UntreatedParts::ReplaceLastUpdatePart(off_t start, off_t size) +{ + AutoLock auto_lock(&untreated_list_lock); + + for(untreated_list_t::iterator iter = untreated_list.begin(); iter != untreated_list.end(); ++iter){ + if(iter->untreated_tag == last_tag){ + if(0 < size){ + iter->start = start; + iter->size = size; + }else{ + untreated_list.erase(iter); + } + return true; + } + } + return false; +} + +// +// Remove the last updated Untreated part. +// +bool UntreatedParts::RemoveLastUpdatePart() +{ + AutoLock auto_lock(&untreated_list_lock); + + for(untreated_list_t::iterator iter = untreated_list.begin(); iter != untreated_list.end(); ++iter){ + if(iter->untreated_tag == last_tag){ + untreated_list.erase(iter); + return true; + } + } + return false; +} + +// +// Duplicate the internally untreated_list. +// +bool UntreatedParts::Duplicate(untreated_list_t& list) +{ + AutoLock auto_lock(&untreated_list_lock); + + list = untreated_list; + return true; +} + +void UntreatedParts::Dump() +{ + AutoLock auto_lock(&untreated_list_lock); + + S3FS_PRN_DBG("untreated list = ["); + for(untreated_list_t::const_iterator iter = untreated_list.begin(); iter != untreated_list.end(); ++iter){ + S3FS_PRN_DBG(" {%014lld - %014lld : tag=%ld}", static_cast(iter->start), static_cast(iter->size), iter->untreated_tag); + } + S3FS_PRN_DBG("]"); +} + /* * Local variables: * tab-width: 4 diff --git a/src/fdcache_untreated.h b/src/fdcache_untreated.h index 89738c3..3704953 100644 --- a/src/fdcache_untreated.h +++ b/src/fdcache_untreated.h @@ -46,19 +46,18 @@ class UntreatedParts bool empty(); bool AddPart(off_t start, off_t size); - - // [NOTE] - // The following method does not return parts smaller than mini_size. - // You can avoid it by setting min_size to 0. - // - bool GetPart(off_t& start, off_t& size, off_t max_size, off_t min_size = MIN_MULTIPART_SIZE) { return RowGetPart(start, size, max_size, min_size, false); } bool GetLastUpdatedPart(off_t& start, off_t& size, off_t max_size, off_t min_size = MIN_MULTIPART_SIZE) { return RowGetPart(start, size, max_size, min_size, true); } - bool TakeoutPart(off_t& start, off_t& size, off_t max_size, off_t min_size = MIN_MULTIPART_SIZE); - bool TakeoutPartFromBegin(off_t& start, off_t& size, off_t max_size); - bool ClearParts(off_t start, off_t size); bool ClearAll() { return ClearParts(0, 0); } + + bool GetLastUpdatePart(off_t& start, off_t& size); + bool ReplaceLastUpdatePart(off_t start, off_t size); + bool RemoveLastUpdatePart(); + + bool Duplicate(untreated_list_t& list); + + void Dump(); }; #endif // S3FS_FDCACHE_UNTREATED_H_ diff --git a/src/psemaphore.h b/src/psemaphore.h index 402571a..3cc167d 100644 --- a/src/psemaphore.h +++ b/src/psemaphore.h @@ -42,6 +42,14 @@ class Semaphore dispatch_release(sem); } void wait() { dispatch_semaphore_wait(sem, DISPATCH_TIME_FOREVER); } + bool try_wait() + { + if(0 == dispatch_semaphore_wait(sem, DISPATCH_TIME_NOW)){ + return true; + }else{ + return false; + } + } void post() { dispatch_semaphore_signal(sem); } int get_value() const { return value; } @@ -67,6 +75,15 @@ class Semaphore r = sem_wait(&mutex); } while (r == -1 && errno == EINTR); } + bool try_wait() + { + int result; + do{ + result = sem_trywait(&mutex); + }while(result == -1 && errno == EINTR); + + return (0 == result); + } void post() { sem_post(&mutex); } int get_value() const { return value; } diff --git a/src/s3fs.cpp b/src/s3fs.cpp index 0319795..1d5071f 100644 --- a/src/s3fs.cpp +++ b/src/s3fs.cpp @@ -47,6 +47,7 @@ #include "s3fs_help.h" #include "s3fs_util.h" #include "mpu_util.h" +#include "threadpoolman.h" //------------------------------------------------------------------- // Symbols @@ -95,6 +96,7 @@ static int max_keys_list_object = 1000;// default is 1000 static off_t max_dirty_data = 5LL * 1024LL * 1024LL * 1024LL; static bool use_wtf8 = false; static off_t fake_diskfree_size = -1; // default is not set(-1) +static int max_thread_count = 5; // default is 5 //------------------------------------------------------------------- // Global functions : prototype @@ -4084,6 +4086,16 @@ static int my_fuse_opt_proc(void* data, const char* arg, int key, struct fuse_ar S3fsCurl::SetMaxParallelCount(maxpara); return 0; } + if(is_prefix(arg, "max_thread_count=")){ + int max_thcount = static_cast(cvt_strtoofft(strchr(arg, '=') + sizeof(char), /*base=*/ 10)); + if(0 >= max_thcount){ + S3FS_PRN_EXIT("argument should be over 1: max_thread_count"); + return -1; + } + max_thread_count = max_thcount; + S3FS_PRN_WARN("The max_thread_count option is not a formal option. Please note that it will change in the future."); + return 0; + } if(is_prefix(arg, "fd_page_size=")){ S3FS_PRN_ERR("option fd_page_size is no longer supported, so skip this option."); return 0; @@ -4164,6 +4176,11 @@ static int my_fuse_opt_proc(void* data, const char* arg, int key, struct fuse_ar nocopyapi = true; return 0; } + if(0 == strcmp(arg, "streamupload")){ + FdEntity::SetStreamUpload(true); + S3FS_PRN_WARN("The streamupload option is not a formal option. Please note that it will change in the future."); + return 0; + } if(0 == strcmp(arg, "norenameapi")){ norenameapi = true; return 0; @@ -4672,6 +4689,14 @@ int main(int argc, char* argv[]) max_dirty_data = -1; } + if(!ThreadPoolMan::Initialize(max_thread_count)){ + S3FS_PRN_EXIT("Could not create thread pool(%d)", max_thread_count); + S3fsCurl::DestroyS3fsCurl(); + s3fs_destroy_global_ssl(); + destroy_parser_xml_lock(); + exit(EXIT_FAILURE); + } + // check free disk space if(!FdManager::IsSafeDiskSpace(NULL, S3fsCurl::GetMultipartSize() * S3fsCurl::GetMaxParallelCount())){ S3FS_PRN_EXIT("There is no enough disk space for used as cache(or temporary) directory by s3fs."); @@ -4729,6 +4754,9 @@ int main(int argc, char* argv[]) } fuse_opt_free_args(&custom_args); + // Destroy thread pool + ThreadPoolMan::Destroy(); + // Destroy curl if(!S3fsCurl::DestroyS3fsCurl()){ S3FS_PRN_WARN("Could not release curl library."); diff --git a/src/threadpoolman.cpp b/src/threadpoolman.cpp new file mode 100644 index 0000000..ed1d9f8 --- /dev/null +++ b/src/threadpoolman.cpp @@ -0,0 +1,261 @@ +/* + * s3fs - FUSE-based file system backed by Amazon S3 + * + * Copyright(C) 2007 Takeshi Nakatani + * + * This program is free software; you can redistribute it and/or + * modify it under the terms of the GNU General Public License + * as published by the Free Software Foundation; either version 2 + * of the License, or (at your option) any later version. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU General Public License for more details. + * + * You should have received a copy of the GNU General Public License + * along with this program; if not, write to the Free Software + * Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA. + */ + +#include +#include +#include +#include + +#include "common.h" +#include "s3fs.h" +#include "threadpoolman.h" +#include "autolock.h" + +//------------------------------------------------ +// ThreadPoolMan class variables +//------------------------------------------------ +ThreadPoolMan* ThreadPoolMan::singleton = NULL; + +//------------------------------------------------ +// ThreadPoolMan class methods +//------------------------------------------------ +bool ThreadPoolMan::Initialize(int count) +{ + if(ThreadPoolMan::singleton){ + S3FS_PRN_WARN("Already singleton for Thread Manager is existed, then re-create it."); + ThreadPoolMan::Destroy(); + } + ThreadPoolMan::singleton = new ThreadPoolMan(count); + return true; +} + +void ThreadPoolMan::Destroy() +{ + if(ThreadPoolMan::singleton){ + delete ThreadPoolMan::singleton; + ThreadPoolMan::singleton = NULL; + } +} + +bool ThreadPoolMan::Instruct(thpoolman_param* pparam) +{ + if(!ThreadPoolMan::singleton){ + S3FS_PRN_WARN("The singleton object is not initialized yet."); + return false; + } + return ThreadPoolMan::singleton->SetInstruction(pparam); +} + +// +// Thread worker +// +void* ThreadPoolMan::Worker(void* arg) +{ + ThreadPoolMan* psingleton = static_cast(arg); + + if(!psingleton){ + S3FS_PRN_ERR("The parameter for worker thread is invalid."); + return (void*)(intptr_t)(-EIO); + } + S3FS_PRN_INFO3("Start worker thread in ThreadPoolMan."); + + while(!psingleton->is_exit){ + // wait + psingleton->thpoolman_sem.wait(); + + if(psingleton->is_exit){ + break; + } + + // get instruction + thpoolman_param* pparam; + { + AutoLock auto_lock(&(psingleton->thread_list_lock)); + + if(!psingleton->instruction_list.empty()){ + pparam = psingleton->instruction_list.front(); + psingleton->instruction_list.pop_front(); + if(!pparam){ + S3FS_PRN_WARN("Got a semaphore, but the instruction is empty."); + } + }else{ + S3FS_PRN_WARN("Got a semaphore, but there is no instruction."); + pparam = NULL; + } + } + + if(pparam){ + void* retval = pparam->pfunc(pparam->args); + int int_retval = (int)(intptr_t)(retval); + if(0 != int_retval){ + S3FS_PRN_WARN("The instruction function returned with somthign error code(%d).", int_retval); + } + if(pparam->psem){ + pparam->psem->post(); + } + delete pparam; + } + } + + return (void*)(intptr_t)(0); +} + +//------------------------------------------------ +// ThreadPoolMan methods +//------------------------------------------------ +ThreadPoolMan::ThreadPoolMan(int count) : is_exit(false), thpoolman_sem(0) +{ + if(count < 1){ + S3FS_PRN_CRIT("Failed to creating singleton for Thread Manager, because thread count(%d) is under 1.", count); + abort(); + } + if(ThreadPoolMan::singleton){ + S3FS_PRN_CRIT("Already singleton for Thread Manager is existed."); + abort(); + } + + is_lock_init = false; + pthread_mutexattr_t attr; + pthread_mutexattr_init(&attr); +#if S3FS_PTHREAD_ERRORCHECK + pthread_mutexattr_settype(&attr, PTHREAD_MUTEX_ERRORCHECK); +#endif + int result; + if(0 != (result = pthread_mutex_init(&thread_list_lock, &attr))){ + S3FS_PRN_CRIT("failed to init thread_list_lock: %d", result); + abort(); + } + is_lock_init = true; + + // create threads + if(!StartThreads(count)){ + S3FS_PRN_ERR("Failed starting threads at initializing."); + abort(); + } +} + +ThreadPoolMan::~ThreadPoolMan() +{ + StopThreads(); + + if(is_lock_init){ + int result; + if(0 != (result = pthread_mutex_destroy(&thread_list_lock))){ + S3FS_PRN_CRIT("failed to destroy thread_list_lock: %d", result); + abort(); + } + is_lock_init = false; + } +} + +bool ThreadPoolMan::StopThreads() +{ + if(thread_list.empty()){ + S3FS_PRN_INFO("Any threads are running now, then nothing to do."); + return true; + } + + // all threads to exit + is_exit = true; + for(uint waitcnt = thread_list.size(); 0 < waitcnt; --waitcnt){ + thpoolman_sem.post(); + } + + // wait for threads exiting + for(thread_list_t::const_iterator iter = thread_list.begin(); iter != thread_list.end(); ++iter){ + void* retval = NULL; + int result = pthread_join(*iter, &retval); + if(result){ + S3FS_PRN_ERR("failed pthread_join - result(%d)", result); + }else{ + S3FS_PRN_DBG("succeed pthread_join - return code(%d)", (int)(intptr_t)(retval)); + } + } + thread_list.clear(); + + // reset semaphore(to zero) + while(thpoolman_sem.try_wait()); + + // clear instructions + for(thpoolman_params_t::const_iterator iter = instruction_list.begin(); iter != instruction_list.end(); ++iter){ + thpoolman_param* pparam = *iter; + delete pparam; + } + instruction_list.clear(); + + return true; +} + +bool ThreadPoolMan::StartThreads(int count) +{ + if(count < 1){ + S3FS_PRN_ERR("Failed to creating threads, because thread count(%d) is under 1.", count); + return false; + } + + // stop all thread if they are running. + if(!StopThreads()){ + S3FS_PRN_ERR("Failed to stop existed threads."); + return false; + } + + // create all threads + is_exit = false; + for(int cnt = 0; cnt < count; ++cnt){ + // run thread + pthread_t thread; + int result; + if(0 != (result = pthread_create(&thread, NULL, ThreadPoolMan::Worker, static_cast(this)))){ + S3FS_PRN_ERR("failed pthread_create with return code(%d)", result); + StopThreads(); // if possible, stop all threads + return false; + } + thread_list.push_back(thread); + } + return true; +} + +bool ThreadPoolMan::SetInstruction(thpoolman_param* pparam) +{ + if(!pparam){ + S3FS_PRN_ERR("The parameter value is NULL."); + return false; + } + + // set parameter to list + { + AutoLock auto_lock(&thread_list_lock); + instruction_list.push_back(pparam); + } + + // run thread + thpoolman_sem.post(); + + return true; +} + +/* +* Local variables: +* tab-width: 4 +* c-basic-offset: 4 +* End: +* vim600: expandtab sw=4 ts=4 fdm=marker +* vim<600: expandtab sw=4 ts=4 +*/ diff --git a/src/threadpoolman.h b/src/threadpoolman.h new file mode 100644 index 0000000..5066532 --- /dev/null +++ b/src/threadpoolman.h @@ -0,0 +1,97 @@ +/* + * s3fs - FUSE-based file system backed by Amazon S3 + * + * Copyright(C) 2007 Randy Rizun + * + * This program is free software; you can redistribute it and/or + * modify it under the terms of the GNU General Public License + * as published by the Free Software Foundation; either version 2 + * of the License, or (at your option) any later version. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU General Public License for more details. + * + * You should have received a copy of the GNU General Public License + * along with this program; if not, write to the Free Software + * Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA. + */ + +#ifndef S3FS_THREADPOOLMAN_H_ +#define S3FS_THREADPOOLMAN_H_ + +#include "psemaphore.h" + +//------------------------------------------------ +// Typedefs for functions and structures +//------------------------------------------------ +// +// Prototype function +// +typedef void* (*thpoolman_worker)(void*); // same as start_routine for pthread_create function + +// +// Parameter structure +// +// [NOTE] +// The args member is a value that is an argument of the worker function. +// The psem member is allowed NULL. If it is not NULL, the post() method is +// called when finishing the function. +// +struct thpoolman_param +{ + void* args; + Semaphore* psem; + thpoolman_worker pfunc; + + thpoolman_param() : args(NULL), psem(NULL), pfunc(NULL) {} +}; + +typedef std::list thpoolman_params_t; + +typedef std::list thread_list_t; + +//------------------------------------------------ +// Class ThreadPoolMan +//------------------------------------------------ +class ThreadPoolMan +{ + private: + static ThreadPoolMan* singleton; + + volatile bool is_exit; + Semaphore thpoolman_sem; + + bool is_lock_init; + pthread_mutex_t thread_list_lock; + thread_list_t thread_list; + + thpoolman_params_t instruction_list; + + private: + static void* Worker(void* arg); + + explicit ThreadPoolMan(int count = 1); + ~ThreadPoolMan(); + + bool StopThreads(); + bool StartThreads(int count); + bool SetInstruction(thpoolman_param* pparam); + + public: + static bool Initialize(int count); + static void Destroy(); + static bool Instruct(thpoolman_param* pparam); +}; + +#endif // S3FS_THREADPOOLMAN_H_ + +/* +* Local variables: +* tab-width: 4 +* c-basic-offset: 4 +* End: +* vim600: expandtab sw=4 ts=4 fdm=marker +* vim<600: expandtab sw=4 ts=4 +*/ diff --git a/src/types.h b/src/types.h index 23d00b3..2372a30 100644 --- a/src/types.h +++ b/src/types.h @@ -243,7 +243,7 @@ struct filepart petag = petagobj; } - int get_part_number() + int get_part_number() const { if(!petag){ return -1; @@ -282,9 +282,13 @@ struct untreatedpart untreated_tag = 0; } + // [NOTE] + // Check if the areas overlap + // However, even if the areas do not overlap, this method returns true if areas are adjacent. + // bool check_overlap(off_t chk_start, off_t chk_size) { - if(chk_start < 0 || chk_size <= 0 || (chk_start + chk_size) < start || (start + size) < chk_start){ + if(chk_start < 0 || chk_size <= 0 || start < 0 || size <= 0 || (chk_start + chk_size) < start || (start + size) < chk_start){ return false; } return true; @@ -308,6 +312,32 @@ struct untreatedpart typedef std::list untreated_list_t; +// +// Information on each part of multipart upload +// +struct mp_part +{ + off_t start; + off_t size; + int part_num; // Set only for information to upload + + mp_part(off_t set_start = 0, off_t set_size = 0, int part = 0) : start(set_start), size(set_size), part_num(part) {} +}; + +typedef std::list mp_part_list_t; + +struct total_mp_part_list +{ + off_t operator()(const mp_part_list_t& mplist) const + { + off_t size = 0; + for(mp_part_list_t::const_iterator iter = mplist.begin(); iter != mplist.end(); ++iter){ + size += iter->size; + } + return size; + } +}; + //------------------------------------------------------------------- // mimes_t //------------------------------------------------------------------- diff --git a/test/small-integration-test.sh b/test/small-integration-test.sh index 7547187..c51a809 100755 --- a/test/small-integration-test.sh +++ b/test/small-integration-test.sh @@ -52,6 +52,7 @@ if [ -n "${ALL_TESTS}" ]; then sigv4 "singlepart_copy_limit=10" # limit size to exercise multipart code paths #use_sse # TODO: S3Proxy does not support SSE + "use_cache=${CACHE_DIR} -o ensure_diskfree=${ENSURE_DISKFREE_SIZE} -o fake_diskfree=${FAKE_FREE_DISK_SIZE} -o streamupload" ) else FLAGS=(