From c5ebf5d32867b7262e88da32704d76c8430ac0e0 Mon Sep 17 00:00:00 2001 From: Andrew Gaul Date: Mon, 28 Jan 2019 23:39:11 -0800 Subject: [PATCH] Copy parts in parallel S3 can copy multipart much faster than single part due to IO parallelization. Renaming a 4 GB file reduces from 72 to 20 seconds with bigger gains with larger files. --- src/curl.cpp | 159 +++++++++++++++++++++++++++++++++------------- src/curl.h | 5 +- src/s3fs.cpp | 2 +- src/s3fs_util.cpp | 2 +- 4 files changed, 122 insertions(+), 46 deletions(-) diff --git a/src/curl.cpp b/src/curl.cpp index c7a4788..fae4c6c 100644 --- a/src/curl.cpp +++ b/src/curl.cpp @@ -324,7 +324,9 @@ void CurlHandlerPool::ReturnHandler(CURL* h) // Class S3fsCurl //------------------------------------------------------------------- static const int MULTIPART_SIZE = 10 * 1024 * 1024; -static const int MAX_MULTI_COPY_SOURCE_SIZE = 500 * 1024 * 1024; +// constant must be at least 512 MB to copy the maximum 5 TB object size +// TODO: scale part size with object size +static const int MAX_MULTI_COPY_SOURCE_SIZE = 512 * 1024 * 1024; static const int IAM_EXPIRE_MERGIN = 20 * 60; // update timing static const std::string ECS_IAM_ENV_VAR = "AWS_CONTAINER_CREDENTIALS_RELATIVE_URI"; @@ -1318,6 +1320,43 @@ S3fsCurl* S3fsCurl::UploadMultipartPostRetryCallback(S3fsCurl* s3fscurl) return newcurl; } +S3fsCurl* S3fsCurl::CopyMultipartPostRetryCallback(S3fsCurl* s3fscurl) +{ + if(!s3fscurl){ + return NULL; + } + // parse and get part_num, upload_id. + string upload_id; + string part_num_str; + int part_num; + if(!get_keyword_value(s3fscurl->url, "uploadId", upload_id)){ + return NULL; + } + if(!get_keyword_value(s3fscurl->url, "partNumber", part_num_str)){ + return NULL; + } + part_num = atoi(part_num_str.c_str()); + + if(s3fscurl->retry_count >= S3fsCurl::retries){ + S3FS_PRN_ERR("Over retry count(%d) limit(%s:%d).", s3fscurl->retry_count, s3fscurl->path.c_str(), part_num); + return NULL; + } + + // duplicate request + S3fsCurl* newcurl = new S3fsCurl(s3fscurl->IsUseAhbe()); + newcurl->partdata.etaglist = s3fscurl->partdata.etaglist; + newcurl->partdata.etagpos = s3fscurl->partdata.etagpos; + newcurl->retry_count = s3fscurl->retry_count + 1; + + // setup new curl object + if(0 != newcurl->UploadMultipartPostSetup(s3fscurl->path.c_str(), part_num, upload_id)){ + S3FS_PRN_ERR("Could not duplicate curl object(%s:%d).", s3fscurl->path.c_str(), part_num); + delete newcurl; + return NULL; + } + return newcurl; +} + int S3fsCurl::ParallelMultipartUploadRequest(const char* tpath, headers_t& meta, int fd) { int result; @@ -3554,7 +3593,7 @@ int S3fsCurl::UploadMultipartPostRequest(const char* tpath, int part_num, const return result; } -int S3fsCurl::CopyMultipartPostRequest(const char* from, const char* to, int part_num, string& upload_id, headers_t& meta) +int S3fsCurl::CopyMultipartPostSetup(const char* from, const char* to, int part_num, string& upload_id, headers_t& meta) { S3FS_PRN_INFO3("[from=%s][to=%s][part=%d]", SAFESTRPTR(from), SAFESTRPTR(to), part_num); @@ -3610,43 +3649,7 @@ int S3fsCurl::CopyMultipartPostRequest(const char* from, const char* to, int par // request S3FS_PRN_INFO3("copying... [from=%s][to=%s][part=%d]", from, to, part_num); - int result = RequestPerform(); - if(0 == result){ - // parse ETag from response - xmlDocPtr doc; - if(NULL == (doc = xmlReadMemory(bodydata->str(), bodydata->size(), "", NULL, 0))){ - return result; - } - if(NULL == doc->children){ - S3FS_XMLFREEDOC(doc); - return result; - } - for(xmlNodePtr cur_node = doc->children->children; NULL != cur_node; cur_node = cur_node->next){ - if(XML_ELEMENT_NODE == cur_node->type){ - string elementName = reinterpret_cast(cur_node->name); - if(cur_node->children){ - if(XML_TEXT_NODE == cur_node->children->type){ - if(elementName == "ETag") { - string etag = reinterpret_cast(cur_node->children->content); - if(etag.size() >= 2 && *etag.begin() == '"' && *etag.rbegin() == '"'){ - etag.assign(etag.substr(1, etag.size() - 2)); - } - partdata.etag.assign(etag); - partdata.uploaded = true; - } - } - } - } - } - S3FS_XMLFREEDOC(doc); - } - - delete bodydata; - bodydata = NULL; - delete headdata; - headdata = NULL; - - return result; + return 0; } bool S3fsCurl::UploadMultipartPostComplete() @@ -3674,6 +3677,53 @@ bool S3fsCurl::UploadMultipartPostComplete() return true; } +bool S3fsCurl::CopyMultipartPostCallback(S3fsCurl* s3fscurl) +{ + if(!s3fscurl){ + return false; + } + + return s3fscurl->CopyMultipartPostComplete(); +} + +bool S3fsCurl::CopyMultipartPostComplete() +{ + // parse ETag from response + xmlDocPtr doc; + if(NULL == (doc = xmlReadMemory(bodydata->str(), bodydata->size(), "", NULL, 0))){ + return false; + } + if(NULL == doc->children){ + S3FS_XMLFREEDOC(doc); + return false; + } + for(xmlNodePtr cur_node = doc->children->children; NULL != cur_node; cur_node = cur_node->next){ + if(XML_ELEMENT_NODE == cur_node->type){ + string elementName = reinterpret_cast(cur_node->name); + if(cur_node->children){ + if(XML_TEXT_NODE == cur_node->children->type){ + if(elementName == "ETag") { + string etag = reinterpret_cast(cur_node->children->content); + if(etag.size() >= 2 && *etag.begin() == '"' && *etag.rbegin() == '"'){ + etag.assign(etag.substr(1, etag.size() - 2)); + } + partdata.etaglist->at(partdata.etagpos).assign(etag); + partdata.uploaded = true; + } + } + } + } + } + S3FS_XMLFREEDOC(doc); + + delete bodydata; + bodydata = NULL; + delete headdata; + headdata = NULL; + + return true; +} + int S3fsCurl::MultipartHeadRequest(const char* tpath, off_t size, headers_t& meta, bool is_copy) { int result; @@ -3698,7 +3748,7 @@ int S3fsCurl::MultipartHeadRequest(const char* tpath, off_t size, headers_t& met strrange.str(""); strrange.clear(stringstream::goodbit); - if(0 != (result = CopyMultipartPostRequest(tpath, tpath, (list.size() + 1), upload_id, meta))){ + if(0 != (result = CopyMultipartPostSetup(tpath, tpath, (list.size() + 1), upload_id, meta))){ return result; } list.push_back(partdata.etag); @@ -3830,6 +3880,11 @@ int S3fsCurl::MultipartRenameRequest(const char* from, const char* to, headers_t } DestroyCurlHandle(); + // Initialize S3fsMultiCurl + S3fsMultiCurl curlmulti(GetMaxParallelCount()); + curlmulti.SetSuccessCallback(S3fsCurl::CopyMultipartPostCallback); + curlmulti.SetRetryCallback(S3fsCurl::CopyMultipartPostRetryCallback); + for(bytes_remaining = size, chunk = 0; 0 < bytes_remaining; bytes_remaining -= chunk){ chunk = bytes_remaining > MAX_MULTI_COPY_SOURCE_SIZE ? MAX_MULTI_COPY_SOURCE_SIZE : bytes_remaining; @@ -3838,11 +3893,29 @@ int S3fsCurl::MultipartRenameRequest(const char* from, const char* to, headers_t strrange.str(""); strrange.clear(stringstream::goodbit); - if(0 != (result = CopyMultipartPostRequest(from, to, (list.size() + 1), upload_id, meta))){ + // s3fscurl sub object + S3fsCurl* s3fscurl_para = new S3fsCurl(true); + s3fscurl_para->partdata.add_etag_list(&list); + + // initiate upload part for parallel + if(0 != (result = s3fscurl_para->CopyMultipartPostSetup(from, to, list.size(), upload_id, meta))){ + S3FS_PRN_ERR("failed uploading part setup(%d)", result); + delete s3fscurl_para; return result; } - list.push_back(partdata.etag); - DestroyCurlHandle(); + + // set into parallel object + if(!curlmulti.SetS3fsCurlObject(s3fscurl_para)){ + S3FS_PRN_ERR("Could not make curl object into multi curl(%s).", to); + delete s3fscurl_para; + return -1; + } + } + + // Multi request + if(0 != (result = curlmulti.Request())){ + S3FS_PRN_ERR("error occurred in multi request(errno=%d).", result); + return result; } if(0 != (result = CompleteMultipartPostRequest(to, upload_id, list))){ diff --git a/src/curl.h b/src/curl.h index 3563cd8..1b5ea38 100644 --- a/src/curl.h +++ b/src/curl.h @@ -309,7 +309,9 @@ class S3fsCurl static size_t DownloadWriteCallback(void* ptr, size_t size, size_t nmemb, void* userp); static bool UploadMultipartPostCallback(S3fsCurl* s3fscurl); + static bool CopyMultipartPostCallback(S3fsCurl* s3fscurl); static S3fsCurl* UploadMultipartPostRetryCallback(S3fsCurl* s3fscurl); + static S3fsCurl* CopyMultipartPostRetryCallback(S3fsCurl* s3fscurl); static S3fsCurl* ParallelGetObjectRetryCallback(S3fsCurl* s3fscurl); static bool ParseIAMCredentialResponse(const char* response, iamcredmap_t& keyval); @@ -337,8 +339,9 @@ class S3fsCurl int GetIAMCredentials(void); int UploadMultipartPostSetup(const char* tpath, int part_num, const std::string& upload_id); - int CopyMultipartPostRequest(const char* from, const char* to, int part_num, std::string& upload_id, headers_t& meta); + int CopyMultipartPostSetup(const char* from, const char* to, int part_num, std::string& upload_id, headers_t& meta); bool UploadMultipartPostComplete(); + bool CopyMultipartPostComplete(); public: // class methods diff --git a/src/s3fs.cpp b/src/s3fs.cpp index 5159336..4a50942 100644 --- a/src/s3fs.cpp +++ b/src/s3fs.cpp @@ -133,7 +133,7 @@ static bool is_ecs = false; static bool is_ibm_iam_auth = false; static bool is_use_xattr = false; static bool create_bucket = false; -static int64_t singlepart_copy_limit = FIVE_GB; +static int64_t singlepart_copy_limit = 512 * 1024 * 1024; static bool is_specified_endpoint = false; static int s3fs_init_deferred_exit_status = 0; static bool support_compat_dir = true;// default supports compatibility directory type diff --git a/src/s3fs_util.cpp b/src/s3fs_util.cpp index fb092ef..aa4f3c7 100644 --- a/src/s3fs_util.cpp +++ b/src/s3fs_util.cpp @@ -1197,7 +1197,7 @@ void show_help (void) " space is smaller than this value, s3fs do not use diskspace\n" " as possible in exchange for the performance.\n" "\n" - " singlepart_copy_limit (default=\"5120\")\n" + " singlepart_copy_limit (default=\"512\")\n" " - maximum size, in MB, of a single-part copy before trying \n" " multipart copy.\n" "\n"