From 86b5c9d88e8a2e71873324e1adf93f05a48b689f Mon Sep 17 00:00:00 2001 From: Takeshi Nakatani Date: Mon, 15 Jul 2024 06:40:05 +0000 Subject: [PATCH] Refactored multipart put head request --- src/curl.cpp | 145 ++--------------------- src/curl.h | 12 +- src/curl_util.h | 2 + src/fdcache_entity.cpp | 3 - src/s3fs.cpp | 46 ++------ src/s3fs_threadreqs.cpp | 250 ++++++++++++++++++++++++++++++++++++++++ src/s3fs_threadreqs.h | 20 ++++ 7 files changed, 297 insertions(+), 181 deletions(-) diff --git a/src/curl.cpp b/src/curl.cpp index 8bc9bc8..9b3d71e 100644 --- a/src/curl.cpp +++ b/src/curl.cpp @@ -4252,7 +4252,7 @@ int S3fsCurl::MultipartUploadPartRequest(const char* tpath, int part_num, const return result; } -int S3fsCurl::CopyMultipartUploadSetup(const char* from, const char* to, int part_num, const std::string& upload_id, headers_t& meta) +int S3fsCurl::CopyMultipartUploadSetup(const char* from, const char* to, int part_num, const std::string& upload_id, const headers_t& meta) { S3FS_PRN_INFO3("[from=%s][to=%s][part=%d]", SAFESTRPTR(from), SAFESTRPTR(to), part_num); @@ -4344,20 +4344,6 @@ bool S3fsCurl::MultipartUploadPartComplete() return true; } -// cppcheck-suppress unmatchedSuppression -// cppcheck-suppress constParameter -// cppcheck-suppress constParameterCallback -bool S3fsCurl::CopyMultipartUploadCallback(S3fsCurl* s3fscurl, void* param) -{ - if(!s3fscurl || param){ // this callback does not need a parameter - return false; - } - - // cppcheck-suppress unmatchedSuppression - // cppcheck-suppress knownConditionTrueFalse - return s3fscurl->CopyMultipartUploadComplete(); -} - bool S3fsCurl::CopyMultipartUploadComplete() { std::string etag; @@ -4381,65 +4367,27 @@ bool S3fsCurl::MixMultipartUploadComplete() return result; } -int S3fsCurl::MultipartHeadRequest(const char* tpath, off_t size, headers_t& meta) +int S3fsCurl::MultipartPutHeadRequest(const std::string& from, const std::string& to, int part_number, const std::string& upload_id, const headers_t& meta) { - int result; - std::string upload_id; - off_t chunk; - off_t bytes_remaining; - etaglist_t list; + S3FS_PRN_INFO3("[from=%s][to=%s][part_number=%d][upload_id=%s]", from.c_str(), to.c_str(), part_number, upload_id.c_str()); - S3FS_PRN_INFO3("[tpath=%s]", SAFESTRPTR(tpath)); + int result; - if(0 != (result = PreMultipartUploadRequest(tpath, meta, upload_id))){ + // setup + if(0 != (result = CopyMultipartUploadSetup(from.c_str(), to.c_str(), part_number, upload_id, meta))){ + S3FS_PRN_ERR("failed multipart put head request setup(from=%s, to=%s, part_number=%d, upload_id=%s) : %d", from.c_str(), to.c_str(), part_number, upload_id.c_str(), result); return result; } - DestroyCurlHandle(); - - // Initialize S3fsMultiCurl - S3fsMultiCurl curlmulti(GetMaxParallelCount()); - curlmulti.SetSuccessCallback(S3fsCurl::CopyMultipartUploadCallback); - curlmulti.SetRetryCallback(S3fsCurl::CopyMultipartUploadRetryCallback); - - for(bytes_remaining = size; 0 < bytes_remaining; bytes_remaining -= chunk){ - chunk = bytes_remaining > GetMultipartCopySize() ? GetMultipartCopySize() : bytes_remaining; - - std::ostringstream strrange; - strrange << "bytes=" << (size - bytes_remaining) << "-" << (size - bytes_remaining + chunk - 1); - meta["x-amz-copy-source-range"] = strrange.str(); - - // s3fscurl sub object - std::unique_ptr s3fscurl_para(new S3fsCurl(true)); - s3fscurl_para->b_from = SAFESTRPTR(tpath); - s3fscurl_para->b_meta = meta; - s3fscurl_para->partdata.add_etag_list(list); - - // initiate upload part for parallel - if(0 != (result = s3fscurl_para->CopyMultipartUploadSetup(tpath, tpath, s3fscurl_para->partdata.get_part_number(), upload_id, meta))){ - S3FS_PRN_ERR("failed uploading part setup(%d)", result); - return result; - } - - // set into parallel object - if(!curlmulti.SetS3fsCurlObject(std::move(s3fscurl_para))){ - S3FS_PRN_ERR("Could not make curl object into multi curl(%s).", tpath); - return -EIO; - } + if(!fpLazySetup || !fpLazySetup(this)){ + S3FS_PRN_ERR("failed multipart put head request lazysetup(from=%s, to=%s, part_number=%d, upload_id=%s)", from.c_str(), to.c_str(), part_number, upload_id.c_str()); + return -EIO; } - // Multi request - if(0 != (result = curlmulti.Request())){ - S3FS_PRN_ERR("error occurred in multi request(errno=%d).", result); - int result2; - if(0 != (result2 = abort_multipart_upload_request(std::string(tpath), upload_id))){ - S3FS_PRN_ERR("error aborting multipart upload(errno=%d).", result2); - } + // request + if(0 != (result = RequestPerform())){ return result; } - if(0 != (result = MultipartUploadComplete(tpath, upload_id, list))){ - return result; - } return 0; } @@ -4466,75 +4414,6 @@ int S3fsCurl::MultipartUploadRequest(const std::string& upload_id, const char* t return 0; } -int S3fsCurl::MultipartRenameRequest(const char* from, const char* to, headers_t& meta, off_t size) -{ - int result; - std::string upload_id; - off_t chunk; - off_t bytes_remaining; - etaglist_t list; - - S3FS_PRN_INFO3("[from=%s][to=%s]", SAFESTRPTR(from), SAFESTRPTR(to)); - - std::string srcresource; - std::string srcurl; - MakeUrlResource(get_realpath(from).c_str(), srcresource, srcurl); - - meta["Content-Type"] = S3fsCurl::LookupMimeType(to); - meta["x-amz-copy-source"] = srcresource; - - if(0 != (result = PreMultipartUploadRequest(to, meta, upload_id))){ - return result; - } - DestroyCurlHandle(); - - // Initialize S3fsMultiCurl - S3fsMultiCurl curlmulti(GetMaxParallelCount()); - curlmulti.SetSuccessCallback(S3fsCurl::CopyMultipartUploadCallback); - curlmulti.SetRetryCallback(S3fsCurl::CopyMultipartUploadRetryCallback); - - for(bytes_remaining = size; 0 < bytes_remaining; bytes_remaining -= chunk){ - chunk = bytes_remaining > GetMultipartCopySize() ? GetMultipartCopySize() : bytes_remaining; - - std::ostringstream strrange; - strrange << "bytes=" << (size - bytes_remaining) << "-" << (size - bytes_remaining + chunk - 1); - meta["x-amz-copy-source-range"] = strrange.str(); - - // s3fscurl sub object - std::unique_ptr s3fscurl_para(new S3fsCurl(true)); - s3fscurl_para->b_from = SAFESTRPTR(from); - s3fscurl_para->b_meta = meta; - s3fscurl_para->partdata.add_etag_list(list); - - // initiate upload part for parallel - if(0 != (result = s3fscurl_para->CopyMultipartUploadSetup(from, to, s3fscurl_para->partdata.get_part_number(), upload_id, meta))){ - S3FS_PRN_ERR("failed uploading part setup(%d)", result); - return result; - } - - // set into parallel object - if(!curlmulti.SetS3fsCurlObject(std::move(s3fscurl_para))){ - S3FS_PRN_ERR("Could not make curl object into multi curl(%s).", to); - return -EIO; - } - } - - // Multi request - if(0 != (result = curlmulti.Request())){ - S3FS_PRN_ERR("error occurred in multi request(errno=%d).", result); - int result2; - if(0 != (result2 = abort_multipart_upload_request(std::string(to), upload_id))){ - S3FS_PRN_ERR("error aborting multipart upload(errno=%d).", result2); - } - return result; - } - - if(0 != (result = MultipartUploadComplete(to, upload_id, list))){ - return result; - } - return 0; -} - /* * Local variables: * tab-width: 4 diff --git a/src/curl.h b/src/curl.h index c6e5277..06cd7df 100644 --- a/src/curl.h +++ b/src/curl.h @@ -196,13 +196,13 @@ class S3fsCurl size_t b_ssekey_pos; // backup for retrying std::string b_ssevalue; // backup for retrying sse_type_t b_ssetype; // backup for retrying - std::string b_from; // backup for retrying(for copy request) + std::string b_from; // backup for retrying(for copy request) ([TODO] If S3fsMultiCurl is discontinued, this variable will be deleted.) headers_t b_meta; // backup for retrying(for copy request) std::string op; // the HTTP verb of the request ("PUT", "GET", etc.) std::string query_string; // request query string Semaphore *sem; - std::mutex *completed_tids_lock; - std::vector *completed_tids PT_GUARDED_BY(*completed_tids_lock); + std::mutex *completed_tids_lock; // ([TODO] If S3fsMultiCurl is discontinued, this variable will be deleted.) + std::vector *completed_tids PT_GUARDED_BY(*completed_tids_lock); // ([TODO] If S3fsMultiCurl is discontinued, this variable will be deleted.) s3fscurl_lazy_setup fpLazySetup; // curl options for lazy setting function CURLcode curlCode; // handle curl return @@ -241,7 +241,6 @@ class S3fsCurl static size_t DownloadWriteCallback(void* ptr, size_t size, size_t nmemb, void* userp); static bool MultipartUploadPartCallback(S3fsCurl* s3fscurl, void* param); - static bool CopyMultipartUploadCallback(S3fsCurl* s3fscurl, void* param); static bool MixMultipartUploadCallback(S3fsCurl* s3fscurl, void* param); static std::unique_ptr MultipartUploadPartRetryCallback(S3fsCurl* s3fscurl); static std::unique_ptr CopyMultipartUploadRetryCallback(S3fsCurl* s3fscurl); @@ -280,7 +279,7 @@ class S3fsCurl std::string CalcSignatureV2(const std::string& method, const std::string& strMD5, const std::string& content_type, const std::string& date, const std::string& resource, const std::string& secret_access_key, const std::string& access_token); std::string CalcSignature(const std::string& method, const std::string& canonical_uri, const std::string& query_string, const std::string& strdate, const std::string& payload_hash, const std::string& date8601, const std::string& secret_access_key, const std::string& access_token); int MultipartUploadPartSetup(const char* tpath, int part_num, const std::string& upload_id); - int CopyMultipartUploadSetup(const char* from, const char* to, int part_num, const std::string& upload_id, headers_t& meta); + int CopyMultipartUploadSetup(const char* from, const char* to, int part_num, const std::string& upload_id, const headers_t& meta); bool MultipartUploadPartComplete(); bool CopyMultipartUploadComplete(); int MapPutErrorResponse(int result); @@ -385,9 +384,8 @@ class S3fsCurl bool MixMultipartUploadComplete(); int MultipartListRequest(std::string& body); int AbortMultipartUpload(const char* tpath, const std::string& upload_id); - int MultipartHeadRequest(const char* tpath, off_t size, headers_t& meta); + int MultipartPutHeadRequest(const std::string& from, const std::string& to, int part_number, const std::string& upload_id, const headers_t& meta); int MultipartUploadRequest(const std::string& upload_id, const char* tpath, int fd, off_t offset, off_t size, etagpair* petagpair); - int MultipartRenameRequest(const char* from, const char* to, headers_t& meta, off_t size); // methods(variables) const std::string& GetPath() const { return path; } diff --git a/src/curl_util.h b/src/curl_util.h index dd12092..89e6757 100644 --- a/src/curl_util.h +++ b/src/curl_util.h @@ -24,6 +24,7 @@ #include #include #include +#include "metaheader.h" enum class sse_type_t : uint8_t; @@ -38,6 +39,7 @@ std::string get_header_value(const struct curl_slist* list, const std::string &k bool MakeUrlResource(const char* realpath, std::string& resourcepath, std::string& url); std::string prepare_url(const char* url); bool get_object_sse_type(const char* path, sse_type_t& ssetype, std::string& ssevalue); // implement in s3fs.cpp +int put_headers(const char* path, const headers_t& meta, bool is_copy, bool use_st_size = true); // implement in s3fs.cpp bool make_md5_from_binary(const char* pstr, size_t length, std::string& md5); std::string url_to_host(const std::string &url); diff --git a/src/fdcache_entity.cpp b/src/fdcache_entity.cpp index 9193f6e..65844f3 100644 --- a/src/fdcache_entity.cpp +++ b/src/fdcache_entity.cpp @@ -2513,9 +2513,6 @@ bool FdEntity::MergeOrgMeta(headers_t& updatemeta) return (pending_status_t::NO_UPDATE_PENDING != pending_status); } -// global function in s3fs.cpp -int put_headers(const char* path, headers_t& meta, bool is_copy, bool use_st_size = true); - int FdEntity::UploadPendingHasLock(int fd) { int result; diff --git a/src/s3fs.cpp b/src/s3fs.cpp index badadf7..2eb12f2 100644 --- a/src/s3fs.cpp +++ b/src/s3fs.cpp @@ -44,6 +44,7 @@ #include "fdcache_stat.h" #include "curl.h" #include "curl_multi.h" +#include "curl_util.h" #include "s3objlist.h" #include "cache.h" #include "addhead.h" @@ -110,11 +111,6 @@ static bool update_parent_dir_stat= false; // default not updating parent direc static fsblkcnt_t bucket_block_count; // advertised block count of the bucket static unsigned long s3fs_block_size = 16 * 1024 * 1024; // s3fs block size is 16MB -//------------------------------------------------------------------- -// Global functions : prototype -//------------------------------------------------------------------- -int put_headers(const char* path, headers_t& meta, bool is_copy, bool use_st_size = true); // [NOTE] global function because this is called from FdEntity class - //------------------------------------------------------------------- // Static functions : prototype //------------------------------------------------------------------- @@ -849,7 +845,7 @@ static int get_local_fent(AutoFdEntity& autoent, FdEntity **entity, const char* // create or update s3 meta // @return fuse return code // -int put_headers(const char* path, headers_t& meta, bool is_copy, bool use_st_size) +int put_headers(const char* path, const headers_t& meta, bool is_copy, bool use_st_size) { int result; off_t size; @@ -876,11 +872,7 @@ int put_headers(const char* path, headers_t& meta, bool is_copy, bool use_st_siz } if(!nocopyapi && !nomultipart && size >= multipart_threshold){ - // [TODO] - // This object will be removed after removing S3fsMultiCurl - // - S3fsCurl s3fscurl(true); - if(0 != (result = s3fscurl.MultipartHeadRequest(strpath.c_str(), size, meta))){ + if(0 != (result = multipart_put_head_request(strpath, strpath, size, meta))){ return result; } }else{ @@ -1560,14 +1552,9 @@ static int rename_large_object(const char* from, const char* to) return result; } - // [TODO] - // This object will be removed after removing S3fsMultiCurl - // - S3fsCurl s3fscurl(true); - if(0 != (result = s3fscurl.MultipartRenameRequest(from, to, meta, buf.st_size))){ + if(0 != (result = multipart_put_head_request(std::string(from), std::string(to), buf.st_size, meta))){ return result; } - s3fscurl.DestroyCurlHandle(); // Rename cache file FdManager::get()->Rename(from, to); @@ -3124,27 +3111,10 @@ static int readdir_multi_head(const char* path, const S3ObjList& head, void* buf continue; } - // parameter for thread worker - auto* thargs = new multi_head_req_thparam; // free in multi_head_req_threadworker - thargs->psyncfiller = &syncfiller; - thargs->pthparam_lock = &thparam_lock; // for pretrycount and presult member - thargs->pretrycount = &retrycount; - thargs->pnotfound_list = ¬found_list; - thargs->use_wtf8 = use_wtf8; - thargs->path = disppath; - thargs->presult = &req_result; - - // make parameter for thread pool - thpoolman_param ppoolparam; - ppoolparam.args = thargs; - ppoolparam.psem = &multi_head_sem; - ppoolparam.pfunc = multi_head_req_threadworker; - - // setup instruction - if(!ThreadPoolMan::Instruct(ppoolparam)){ - S3FS_PRN_ERR("failed setup instruction for one header request."); - delete thargs; - return -EIO; + // set one head request + int result; + if(0 != (result = multi_head_request(disppath, syncfiller, thparam_lock, retrycount, notfound_list, use_wtf8, req_result, multi_head_sem))){ + return result; } ++req_count; } diff --git a/src/s3fs_threadreqs.cpp b/src/s3fs_threadreqs.cpp index c4fc86d..a199f7a 100644 --- a/src/s3fs_threadreqs.cpp +++ b/src/s3fs_threadreqs.cpp @@ -18,6 +18,8 @@ * Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA. */ +#include + #include "common.h" #include "s3fs.h" #include "s3fs_threadreqs.h" @@ -354,6 +356,123 @@ void* get_object_req_threadworker(void* arg) return reinterpret_cast(pthparam->result); } +// +// Thread Worker function for multipart put head request +// +void* multipart_put_head_req_threadworker(void* arg) +{ + auto* pthparam = static_cast(arg); + if(!pthparam || !pthparam->ppartdata || !pthparam->pthparam_lock || !pthparam->pretrycount || !pthparam->presult){ + return reinterpret_cast(-EIO); + } + + // Check retry max count and print debug message + { + const std::lock_guard lock(*(pthparam->pthparam_lock)); + + S3FS_PRN_INFO3("Multipart Put Head Request [from=%s][to=%s][upload_id=%s][part_number=%d][filepart=%p][thparam_lock=%p][retrycount=%d][from=%s][to=%s]", pthparam->from.c_str(), pthparam->to.c_str(), pthparam->upload_id.c_str(), pthparam->part_number, pthparam->ppartdata, pthparam->pthparam_lock, *(pthparam->pretrycount), pthparam->from.c_str(), pthparam->to.c_str()); + + if(S3fsCurl::GetRetries() < *(pthparam->pretrycount)){ + S3FS_PRN_ERR("Multipart Put Head request(%s->%s) reached the maximum number of retry count(%d).", pthparam->from.c_str(), pthparam->to.c_str(), *(pthparam->pretrycount)); + return reinterpret_cast(-EIO); + } + } + + S3fsCurl s3fscurl(true); + int result = 0; + while(true){ + // Request + result = s3fscurl.MultipartPutHeadRequest(pthparam->from, pthparam->to, pthparam->part_number, pthparam->upload_id, pthparam->meta); + + // Check result + bool isResetOffset= true; + CURLcode curlCode = s3fscurl.GetCurlCode(); + long responseCode = S3fsCurl::S3FSCURL_RESPONSECODE_NOTSET; + s3fscurl.GetResponseCode(responseCode, false); + + if(CURLE_OK == curlCode){ + if(responseCode < 400){ + // add into stat cache + { + const std::lock_guard lock(*(pthparam->pthparam_lock)); + + std::string etag; + pthparam->ppartdata->uploaded = simple_parse_xml(s3fscurl.GetBodyData().c_str(), s3fscurl.GetBodyData().size(), "ETag", etag); + pthparam->ppartdata->petag->etag = peeloff(etag); + } + result = 0; + break; + + }else if(responseCode == 400){ + // as possibly in multipart + S3FS_PRN_WARN("Put Head Request(%s->%s) got 400 response code.", pthparam->from.c_str(), pthparam->to.c_str()); + + }else if(responseCode == 404){ + // set path to not found list + S3FS_PRN_WARN("Put Head Request(%s->%s) got 404 response code.", pthparam->from.c_str(), pthparam->to.c_str()); + break; + + }else if(responseCode == 500){ + // case of all other result, do retry.(11/13/2013) + // because it was found that s3fs got 500 error from S3, but could success + // to retry it. + S3FS_PRN_WARN("Put Head Request(%s->%s) got 500 response code.", pthparam->from.c_str(), pthparam->to.c_str()); + + // cppcheck-suppress unmatchedSuppression + // cppcheck-suppress knownConditionTrueFalse + }else if(responseCode == S3fsCurl::S3FSCURL_RESPONSECODE_NOTSET){ + // This is a case where the processing result has not yet been updated (should be very rare). + S3FS_PRN_WARN("Put Head Request(%s->%s) could not get any response code.", pthparam->from.c_str(), pthparam->to.c_str()); + + }else{ // including S3fsCurl::S3FSCURL_RESPONSECODE_FATAL_ERROR + // Retry in other case. + S3FS_PRN_WARN("Put Head Request(%s->%s) got fatal response code.", pthparam->from.c_str(), pthparam->to.c_str()); + } + + }else if(CURLE_OPERATION_TIMEDOUT == curlCode){ + S3FS_PRN_ERR("Put Head Request(%s->%s) is timeouted.", pthparam->from.c_str(), pthparam->to.c_str()); + isResetOffset= false; + + }else if(CURLE_PARTIAL_FILE == curlCode){ + S3FS_PRN_WARN("Put Head Request(%s->%s) is recieved data does not match the given size.", pthparam->from.c_str(), pthparam->to.c_str()); + isResetOffset= false; + + }else{ + S3FS_PRN_WARN("Put Head Request(%s->%s) got the result code(%d: %s)", pthparam->from.c_str(), pthparam->to.c_str(), curlCode, curl_easy_strerror(curlCode)); + } + + // Check retry max count + { + const std::lock_guard lock(*(pthparam->pthparam_lock)); + + ++(*(pthparam->pretrycount)); + if(S3fsCurl::GetRetries() < *(pthparam->pretrycount)){ + S3FS_PRN_ERR("Put Head Request(%s->%s) reached the maximum number of retry count(%d).", pthparam->from.c_str(), pthparam->to.c_str(), *(pthparam->pretrycount)); + if(0 == result){ + result = -EIO; + } + break; + } + } + + // Setup for retry + if(isResetOffset){ + S3fsCurl::ResetOffset(&s3fscurl); + } + } + + // Set result code + { + const std::lock_guard lock(*(pthparam->pthparam_lock)); + if(0 == *(pthparam->presult) && 0 != result){ + // keep first error + *(pthparam->presult) = result; + } + } + + return reinterpret_cast(result); +} + //------------------------------------------------------------------- // Utility functions //------------------------------------------------------------------- @@ -387,6 +506,36 @@ int head_request(const std::string& strpath, headers_t& header) return 0; } +// +// Calls S3fsCurl::HeadRequest via multi_head_req_threadworker +// +int multi_head_request(const std::string& strpath, SyncFiller& syncfiller, std::mutex& thparam_lock, int& retrycount, s3obj_list_t& notfound_list, bool use_wtf8, int& result, Semaphore& sem) +{ + // parameter for thread worker + auto* thargs = new multi_head_req_thparam; // free in multi_head_req_threadworker + thargs->path = strpath; + thargs->psyncfiller = &syncfiller; + thargs->pthparam_lock = &thparam_lock; // for pretrycount and presult member + thargs->pretrycount = &retrycount; + thargs->pnotfound_list = ¬found_list; + thargs->use_wtf8 = use_wtf8; + thargs->presult = &result; + + // make parameter for thread pool + thpoolman_param ppoolparam; + ppoolparam.args = thargs; + ppoolparam.psem = &sem; + ppoolparam.pfunc = multi_head_req_threadworker; + + // setup instruction + if(!ThreadPoolMan::Instruct(ppoolparam)){ + S3FS_PRN_ERR("failed to setup Multi Head Request Thread Worker [path=%s]", strpath.c_str()); + delete thargs; + return -EIO; + } + return 0; +} + // // Calls S3fsCurl::DeleteRequest via delete_req_threadworker // @@ -633,6 +782,107 @@ int abort_multipart_upload_request(const std::string& path, const std::string& u return 0; } +// +// Calls S3fsCurl::MultipartPutHeadRequest via multipart_put_head_req_threadworker +// +int multipart_put_head_request(const std::string& strfrom, const std::string& strto, off_t size, const headers_t& meta) +{ + S3FS_PRN_INFO3("[from=%s][to=%s]", strfrom.c_str(), strto.c_str()); + + bool is_rename = (strfrom != strto); + int result; + std::string upload_id; + off_t chunk; + off_t bytes_remaining; + etaglist_t list; + + // Prepare additional header information for rename + std::string contenttype; + std::string srcresource; + if(is_rename){ + std::string srcurl; // this is not used + MakeUrlResource(get_realpath(strfrom.c_str()).c_str(), srcresource, srcurl); + contenttype = S3fsCurl::LookupMimeType(strto); + } + + // get upload_id + if(0 != (result = pre_multipart_upload_request(strto, meta, upload_id))){ + return result; + } + + // common variables + Semaphore multi_head_sem(0); + std::mutex thparam_lock; + filepart partdata; + int req_count = 0; + int retrycount = 0; + int req_result = 0; + + for(bytes_remaining = size; 0 < bytes_remaining; bytes_remaining -= chunk){ + chunk = bytes_remaining > S3fsCurl::GetMultipartCopySize() ? S3fsCurl::GetMultipartCopySize() : bytes_remaining; + + partdata.add_etag_list(list); + + // parameter for thread worker + auto* thargs = new multipart_put_head_req_thparam; // free in multipart_put_head_req_threadworker + thargs->from = strfrom; + thargs->to = strto; + thargs->upload_id = upload_id; + thargs->part_number = partdata.get_part_number(); + thargs->meta = meta; + thargs->pthparam_lock = &thparam_lock; + thargs->ppartdata = &partdata; + thargs->pretrycount = &retrycount; + thargs->presult = &req_result; + + std::ostringstream strrange; + strrange << "bytes=" << (size - bytes_remaining) << "-" << (size - bytes_remaining + chunk - 1); + thargs->meta["x-amz-copy-source-range"] = strrange.str(); + + if(is_rename){ + thargs->meta["Content-Type"] = contenttype; + thargs->meta["x-amz-copy-source"] = srcresource; + } + + // make parameter for thread pool + thpoolman_param ppoolparam; + ppoolparam.args = thargs; + ppoolparam.psem = &multi_head_sem; + ppoolparam.pfunc = multipart_put_head_req_threadworker; + + // setup instruction + if(!ThreadPoolMan::Instruct(ppoolparam)){ + S3FS_PRN_ERR("failed setup instruction for one header request."); + delete thargs; + return -EIO; + } + ++req_count; + } + + // wait for finish all requests + while(req_count > 0){ + multi_head_sem.acquire(); + --req_count; + } + + // check result + if(0 != req_result){ + S3FS_PRN_ERR("error occurred in multi request(errno=%d).", req_result); + int result2; + if(0 != (result2 = abort_multipart_upload_request(strto, upload_id))){ + S3FS_PRN_ERR("error aborting multipart upload(errno=%d).", result2); + } + return req_result; + } + + // completion process + if(0 != (result = complete_multipart_upload_request(strto, upload_id, list))){ + return result; + } + + return 0; +} + // // Calls S3fsCurl::GetObjectRequest via get_object_req_threadworker // diff --git a/src/s3fs_threadreqs.h b/src/s3fs_threadreqs.h index 601b5eb..f83315b 100644 --- a/src/s3fs_threadreqs.h +++ b/src/s3fs_threadreqs.h @@ -28,6 +28,7 @@ #include "curl.h" #include "s3objlist.h" #include "syncfiller.h" +#include "psemaphore.h" //------------------------------------------------------------------- // Structures for MultiThread Request @@ -144,6 +145,22 @@ struct abort_multipart_upload_req_thparam int result = 0; }; +// +// Multipart Put Head Request parameter structure for Thread Pool. +// +struct multipart_put_head_req_thparam +{ + std::string from; + std::string to; + std::string upload_id; + int part_number = 0; + headers_t meta; + std::mutex* pthparam_lock = nullptr; + filepart* ppartdata = nullptr; + int* pretrycount = nullptr; + int* presult = nullptr; +}; + // // Get Object Request parameter structure for Thread Pool. // @@ -169,12 +186,14 @@ void* check_service_req_threadworker(void* arg); void* pre_multipart_upload_req_threadworker(void* arg); void* complete_multipart_upload_threadworker(void* arg); void* abort_multipart_upload_req_threadworker(void* arg); +void* multipart_put_head_req_threadworker(void* arg); void* get_object_req_threadworker(void* arg); //------------------------------------------------------------------- // Utility functions //------------------------------------------------------------------- int head_request(const std::string& strpath, headers_t& header); +int multi_head_request(const std::string& strpath, SyncFiller& syncfiller, std::mutex& thparam_lock, int& retrycount, s3obj_list_t& notfound_list, bool use_wtf8, int& result, Semaphore& sem); int delete_request(const std::string& strpath); int put_head_request(const std::string& strpath, const headers_t& meta, bool is_copy); int put_request(const std::string& strpath, const headers_t& meta, int fd, bool ahbe); @@ -183,6 +202,7 @@ int check_service_request(const std::string& strpath, bool forceNoSSE, bool supp int pre_multipart_upload_request(const std::string& path, const headers_t& meta, std::string& upload_id); int complete_multipart_upload_request(const std::string& path, const std::string& upload_id, const etaglist_t& parts); int abort_multipart_upload_request(const std::string& path, const std::string& upload_id); +int multipart_put_head_request(const std::string& strfrom, const std::string& strto, off_t size, const headers_t& meta); int get_object_request(const std::string& path, int fd, off_t start, off_t size); //-------------------------------------------------------------------