Copy parts in parallel

S3 can copy multipart much faster than single part due to IO
parallelization.  Renaming a 4 GB file reduces from 72 to 20 seconds
with bigger gains with larger files.
This commit is contained in:
Andrew Gaul 2019-01-28 23:39:11 -08:00
parent a442e843be
commit c5ebf5d328
4 changed files with 122 additions and 46 deletions

View File

@ -324,7 +324,9 @@ void CurlHandlerPool::ReturnHandler(CURL* h)
// Class S3fsCurl
//-------------------------------------------------------------------
static const int MULTIPART_SIZE = 10 * 1024 * 1024;
static const int MAX_MULTI_COPY_SOURCE_SIZE = 500 * 1024 * 1024;
// constant must be at least 512 MB to copy the maximum 5 TB object size
// TODO: scale part size with object size
static const int MAX_MULTI_COPY_SOURCE_SIZE = 512 * 1024 * 1024;
static const int IAM_EXPIRE_MERGIN = 20 * 60; // update timing
static const std::string ECS_IAM_ENV_VAR = "AWS_CONTAINER_CREDENTIALS_RELATIVE_URI";
@ -1318,6 +1320,43 @@ S3fsCurl* S3fsCurl::UploadMultipartPostRetryCallback(S3fsCurl* s3fscurl)
return newcurl;
}
S3fsCurl* S3fsCurl::CopyMultipartPostRetryCallback(S3fsCurl* s3fscurl)
{
if(!s3fscurl){
return NULL;
}
// parse and get part_num, upload_id.
string upload_id;
string part_num_str;
int part_num;
if(!get_keyword_value(s3fscurl->url, "uploadId", upload_id)){
return NULL;
}
if(!get_keyword_value(s3fscurl->url, "partNumber", part_num_str)){
return NULL;
}
part_num = atoi(part_num_str.c_str());
if(s3fscurl->retry_count >= S3fsCurl::retries){
S3FS_PRN_ERR("Over retry count(%d) limit(%s:%d).", s3fscurl->retry_count, s3fscurl->path.c_str(), part_num);
return NULL;
}
// duplicate request
S3fsCurl* newcurl = new S3fsCurl(s3fscurl->IsUseAhbe());
newcurl->partdata.etaglist = s3fscurl->partdata.etaglist;
newcurl->partdata.etagpos = s3fscurl->partdata.etagpos;
newcurl->retry_count = s3fscurl->retry_count + 1;
// setup new curl object
if(0 != newcurl->UploadMultipartPostSetup(s3fscurl->path.c_str(), part_num, upload_id)){
S3FS_PRN_ERR("Could not duplicate curl object(%s:%d).", s3fscurl->path.c_str(), part_num);
delete newcurl;
return NULL;
}
return newcurl;
}
int S3fsCurl::ParallelMultipartUploadRequest(const char* tpath, headers_t& meta, int fd)
{
int result;
@ -3554,7 +3593,7 @@ int S3fsCurl::UploadMultipartPostRequest(const char* tpath, int part_num, const
return result;
}
int S3fsCurl::CopyMultipartPostRequest(const char* from, const char* to, int part_num, string& upload_id, headers_t& meta)
int S3fsCurl::CopyMultipartPostSetup(const char* from, const char* to, int part_num, string& upload_id, headers_t& meta)
{
S3FS_PRN_INFO3("[from=%s][to=%s][part=%d]", SAFESTRPTR(from), SAFESTRPTR(to), part_num);
@ -3610,43 +3649,7 @@ int S3fsCurl::CopyMultipartPostRequest(const char* from, const char* to, int par
// request
S3FS_PRN_INFO3("copying... [from=%s][to=%s][part=%d]", from, to, part_num);
int result = RequestPerform();
if(0 == result){
// parse ETag from response
xmlDocPtr doc;
if(NULL == (doc = xmlReadMemory(bodydata->str(), bodydata->size(), "", NULL, 0))){
return result;
}
if(NULL == doc->children){
S3FS_XMLFREEDOC(doc);
return result;
}
for(xmlNodePtr cur_node = doc->children->children; NULL != cur_node; cur_node = cur_node->next){
if(XML_ELEMENT_NODE == cur_node->type){
string elementName = reinterpret_cast<const char*>(cur_node->name);
if(cur_node->children){
if(XML_TEXT_NODE == cur_node->children->type){
if(elementName == "ETag") {
string etag = reinterpret_cast<const char *>(cur_node->children->content);
if(etag.size() >= 2 && *etag.begin() == '"' && *etag.rbegin() == '"'){
etag.assign(etag.substr(1, etag.size() - 2));
}
partdata.etag.assign(etag);
partdata.uploaded = true;
}
}
}
}
}
S3FS_XMLFREEDOC(doc);
}
delete bodydata;
bodydata = NULL;
delete headdata;
headdata = NULL;
return result;
return 0;
}
bool S3fsCurl::UploadMultipartPostComplete()
@ -3674,6 +3677,53 @@ bool S3fsCurl::UploadMultipartPostComplete()
return true;
}
bool S3fsCurl::CopyMultipartPostCallback(S3fsCurl* s3fscurl)
{
if(!s3fscurl){
return false;
}
return s3fscurl->CopyMultipartPostComplete();
}
bool S3fsCurl::CopyMultipartPostComplete()
{
// parse ETag from response
xmlDocPtr doc;
if(NULL == (doc = xmlReadMemory(bodydata->str(), bodydata->size(), "", NULL, 0))){
return false;
}
if(NULL == doc->children){
S3FS_XMLFREEDOC(doc);
return false;
}
for(xmlNodePtr cur_node = doc->children->children; NULL != cur_node; cur_node = cur_node->next){
if(XML_ELEMENT_NODE == cur_node->type){
string elementName = reinterpret_cast<const char*>(cur_node->name);
if(cur_node->children){
if(XML_TEXT_NODE == cur_node->children->type){
if(elementName == "ETag") {
string etag = reinterpret_cast<const char *>(cur_node->children->content);
if(etag.size() >= 2 && *etag.begin() == '"' && *etag.rbegin() == '"'){
etag.assign(etag.substr(1, etag.size() - 2));
}
partdata.etaglist->at(partdata.etagpos).assign(etag);
partdata.uploaded = true;
}
}
}
}
}
S3FS_XMLFREEDOC(doc);
delete bodydata;
bodydata = NULL;
delete headdata;
headdata = NULL;
return true;
}
int S3fsCurl::MultipartHeadRequest(const char* tpath, off_t size, headers_t& meta, bool is_copy)
{
int result;
@ -3698,7 +3748,7 @@ int S3fsCurl::MultipartHeadRequest(const char* tpath, off_t size, headers_t& met
strrange.str("");
strrange.clear(stringstream::goodbit);
if(0 != (result = CopyMultipartPostRequest(tpath, tpath, (list.size() + 1), upload_id, meta))){
if(0 != (result = CopyMultipartPostSetup(tpath, tpath, (list.size() + 1), upload_id, meta))){
return result;
}
list.push_back(partdata.etag);
@ -3830,6 +3880,11 @@ int S3fsCurl::MultipartRenameRequest(const char* from, const char* to, headers_t
}
DestroyCurlHandle();
// Initialize S3fsMultiCurl
S3fsMultiCurl curlmulti(GetMaxParallelCount());
curlmulti.SetSuccessCallback(S3fsCurl::CopyMultipartPostCallback);
curlmulti.SetRetryCallback(S3fsCurl::CopyMultipartPostRetryCallback);
for(bytes_remaining = size, chunk = 0; 0 < bytes_remaining; bytes_remaining -= chunk){
chunk = bytes_remaining > MAX_MULTI_COPY_SOURCE_SIZE ? MAX_MULTI_COPY_SOURCE_SIZE : bytes_remaining;
@ -3838,11 +3893,29 @@ int S3fsCurl::MultipartRenameRequest(const char* from, const char* to, headers_t
strrange.str("");
strrange.clear(stringstream::goodbit);
if(0 != (result = CopyMultipartPostRequest(from, to, (list.size() + 1), upload_id, meta))){
// s3fscurl sub object
S3fsCurl* s3fscurl_para = new S3fsCurl(true);
s3fscurl_para->partdata.add_etag_list(&list);
// initiate upload part for parallel
if(0 != (result = s3fscurl_para->CopyMultipartPostSetup(from, to, list.size(), upload_id, meta))){
S3FS_PRN_ERR("failed uploading part setup(%d)", result);
delete s3fscurl_para;
return result;
}
list.push_back(partdata.etag);
DestroyCurlHandle();
// set into parallel object
if(!curlmulti.SetS3fsCurlObject(s3fscurl_para)){
S3FS_PRN_ERR("Could not make curl object into multi curl(%s).", to);
delete s3fscurl_para;
return -1;
}
}
// Multi request
if(0 != (result = curlmulti.Request())){
S3FS_PRN_ERR("error occurred in multi request(errno=%d).", result);
return result;
}
if(0 != (result = CompleteMultipartPostRequest(to, upload_id, list))){

View File

@ -309,7 +309,9 @@ class S3fsCurl
static size_t DownloadWriteCallback(void* ptr, size_t size, size_t nmemb, void* userp);
static bool UploadMultipartPostCallback(S3fsCurl* s3fscurl);
static bool CopyMultipartPostCallback(S3fsCurl* s3fscurl);
static S3fsCurl* UploadMultipartPostRetryCallback(S3fsCurl* s3fscurl);
static S3fsCurl* CopyMultipartPostRetryCallback(S3fsCurl* s3fscurl);
static S3fsCurl* ParallelGetObjectRetryCallback(S3fsCurl* s3fscurl);
static bool ParseIAMCredentialResponse(const char* response, iamcredmap_t& keyval);
@ -337,8 +339,9 @@ class S3fsCurl
int GetIAMCredentials(void);
int UploadMultipartPostSetup(const char* tpath, int part_num, const std::string& upload_id);
int CopyMultipartPostRequest(const char* from, const char* to, int part_num, std::string& upload_id, headers_t& meta);
int CopyMultipartPostSetup(const char* from, const char* to, int part_num, std::string& upload_id, headers_t& meta);
bool UploadMultipartPostComplete();
bool CopyMultipartPostComplete();
public:
// class methods

View File

@ -133,7 +133,7 @@ static bool is_ecs = false;
static bool is_ibm_iam_auth = false;
static bool is_use_xattr = false;
static bool create_bucket = false;
static int64_t singlepart_copy_limit = FIVE_GB;
static int64_t singlepart_copy_limit = 512 * 1024 * 1024;
static bool is_specified_endpoint = false;
static int s3fs_init_deferred_exit_status = 0;
static bool support_compat_dir = true;// default supports compatibility directory type

View File

@ -1197,7 +1197,7 @@ void show_help (void)
" space is smaller than this value, s3fs do not use diskspace\n"
" as possible in exchange for the performance.\n"
"\n"
" singlepart_copy_limit (default=\"5120\")\n"
" singlepart_copy_limit (default=\"512\")\n"
" - maximum size, in MB, of a single-part copy before trying \n"
" multipart copy.\n"
"\n"