Refactored parallel get object request

This commit is contained in:
Takeshi Nakatani 2024-07-15 06:40:05 +00:00 committed by Andrew Gaul
parent 86b5c9d88e
commit 7410b95db2
5 changed files with 194 additions and 80 deletions

View File

@ -1616,83 +1616,6 @@ int S3fsCurl::ParallelMixMultipartUploadRequest(const char* tpath, headers_t& me
return 0;
}
std::unique_ptr<S3fsCurl> S3fsCurl::ParallelGetObjectRetryCallback(S3fsCurl* s3fscurl)
{
int result;
if(!s3fscurl){
return nullptr;
}
if(s3fscurl->retry_count >= S3fsCurl::retries){
S3FS_PRN_ERR("Over retry count(%d) limit(%s).", s3fscurl->retry_count, s3fscurl->path.c_str());
return nullptr;
}
// duplicate request(setup new curl object)
std::unique_ptr<S3fsCurl> newcurl(new S3fsCurl(s3fscurl->IsUseAhbe()));
if(0 != (result = newcurl->PreGetObjectRequest(s3fscurl->path.c_str(), s3fscurl->partdata.fd, s3fscurl->partdata.startpos, s3fscurl->partdata.size, s3fscurl->b_ssetype, s3fscurl->b_ssevalue))){
S3FS_PRN_ERR("failed downloading part setup(%d)", result);
return nullptr;
}
newcurl->retry_count = s3fscurl->retry_count + 1;
return newcurl;
}
int S3fsCurl::ParallelGetObjectRequest(const char* tpath, int fd, off_t start, off_t size)
{
S3FS_PRN_INFO3("[tpath=%s][fd=%d]", SAFESTRPTR(tpath), fd);
sse_type_t ssetype = sse_type_t::SSE_DISABLE;
std::string ssevalue;
if(!get_object_sse_type(tpath, ssetype, ssevalue)){
S3FS_PRN_WARN("Failed to get SSE type for file(%s).", SAFESTRPTR(tpath));
}
int result = 0;
off_t remaining_bytes;
// cycle through open fd, pulling off 10MB chunks at a time
for(remaining_bytes = size; 0 < remaining_bytes; ){
S3fsMultiCurl curlmulti(GetMaxParallelCount());
int para_cnt;
off_t chunk;
// Initialize S3fsMultiCurl
//curlmulti.SetSuccessCallback(nullptr); // not need to set success callback
curlmulti.SetRetryCallback(S3fsCurl::ParallelGetObjectRetryCallback);
// Loop for setup parallel upload(multipart) request.
for(para_cnt = 0; para_cnt < S3fsCurl::max_parallel_cnt && 0 < remaining_bytes; para_cnt++, remaining_bytes -= chunk){
// chunk size
chunk = remaining_bytes > S3fsCurl::multipart_size ? S3fsCurl::multipart_size : remaining_bytes;
// s3fscurl sub object
std::unique_ptr<S3fsCurl> s3fscurl_para(new S3fsCurl(true));
if(0 != (result = s3fscurl_para->PreGetObjectRequest(tpath, fd, (start + size - remaining_bytes), chunk, ssetype, ssevalue))){
S3FS_PRN_ERR("failed downloading part setup(%d)", result);
return result;
}
// set into parallel object
if(!curlmulti.SetS3fsCurlObject(std::move(s3fscurl_para))){
S3FS_PRN_ERR("Could not make curl object into multi curl(%s).", tpath);
return -EIO;
}
}
// Multi request
if(0 != (result = curlmulti.Request())){
S3FS_PRN_ERR("error occurred in multi request(errno=%d).", result);
break;
}
// reinit for loop.
curlmulti.Clear();
}
return result;
}
bool S3fsCurl::MultipartUploadPartSetCurlOpts(S3fsCurl* s3fscurl)
{
if(!s3fscurl){

View File

@ -245,7 +245,6 @@ class S3fsCurl
static std::unique_ptr<S3fsCurl> MultipartUploadPartRetryCallback(S3fsCurl* s3fscurl);
static std::unique_ptr<S3fsCurl> CopyMultipartUploadRetryCallback(S3fsCurl* s3fscurl);
static std::unique_ptr<S3fsCurl> MixMultipartUploadRetryCallback(S3fsCurl* s3fscurl);
static std::unique_ptr<S3fsCurl> ParallelGetObjectRetryCallback(S3fsCurl* s3fscurl);
// lazy functions for set curl options
static bool MultipartUploadPartSetCurlOpts(S3fsCurl* s3fscurl);
@ -293,7 +292,6 @@ class S3fsCurl
static std::unique_ptr<S3fsCurl> CreateParallelS3fsCurl(const char* tpath, int fd, off_t start, off_t size, int part_num, bool is_copy, etagpair* petag, const std::string& upload_id, int& result);
static int ParallelMultipartUploadRequest(const char* tpath, const headers_t& meta, int fd);
static int ParallelMixMultipartUploadRequest(const char* tpath, headers_t& meta, int fd, const fdpage_list_t& mixuppages);
static int ParallelGetObjectRequest(const char* tpath, int fd, off_t start, off_t size);
// class methods(variables)
static std::string LookupMimeType(const std::string& name);

View File

@ -1077,7 +1077,7 @@ int FdEntity::Load(off_t start, off_t size, bool is_modified_flag)
// download
if(S3fsCurl::GetMultipartSize() <= need_load_size && !nomultipart){
// parallel request
result = S3fsCurl::ParallelGetObjectRequest(path.c_str(), physical_fd, iter->offset, need_load_size);
result = parallel_get_object_request(path, physical_fd, iter->offset, need_load_size);
}else{
// single request
if(0 < need_load_size){

View File

@ -473,6 +473,116 @@ void* multipart_put_head_req_threadworker(void* arg)
return reinterpret_cast<void*>(result);
}
//
// Thread Worker function for parallel get object request
//
void* parallel_get_object_req_threadworker(void* arg)
{
auto* pthparam = static_cast<parallel_get_object_req_thparam*>(arg);
if(!pthparam || !pthparam->pthparam_lock || !pthparam->pretrycount || !pthparam->presult){
return reinterpret_cast<void*>(-EIO);
}
// Check retry max count and print debug message
{
const std::lock_guard<std::mutex> lock(*(pthparam->pthparam_lock));
S3FS_PRN_INFO3("Parallel Get Object Request [path=%s][fd=%d][start=%lld][size=%lld][ssetype=%u][ssevalue=%s]", pthparam->path.c_str(), pthparam->fd, static_cast<long long int>(pthparam->start), static_cast<long long int>(pthparam->size), static_cast<uint8_t>(pthparam->ssetype), pthparam->ssevalue.c_str());
if(S3fsCurl::GetRetries() < *(pthparam->pretrycount)){
S3FS_PRN_ERR("Multipart Put Head request(%s) reached the maximum number of retry count(%d).", pthparam->path.c_str(), *(pthparam->pretrycount));
return reinterpret_cast<void*>(-EIO);
}
}
S3fsCurl s3fscurl(true);
int result = 0;
while(true){
// Request
result = s3fscurl.GetObjectRequest(pthparam->path.c_str(), pthparam->fd, pthparam->start, pthparam->size, pthparam->ssetype, pthparam->ssevalue);
// Check result
bool isResetOffset= true;
CURLcode curlCode = s3fscurl.GetCurlCode();
long responseCode = S3fsCurl::S3FSCURL_RESPONSECODE_NOTSET;
s3fscurl.GetResponseCode(responseCode, false);
if(CURLE_OK == curlCode){
if(responseCode < 400){
// nothing to do
result = 0;
break;
}else if(responseCode == 400){
// as possibly in multipart
S3FS_PRN_WARN("Get Object Request(%s) got 400 response code.", pthparam->path.c_str());
}else if(responseCode == 404){
// set path to not found list
S3FS_PRN_WARN("Get Object Request(%s) got 404 response code.", pthparam->path.c_str());
break;
}else if(responseCode == 500){
// case of all other result, do retry.(11/13/2013)
// because it was found that s3fs got 500 error from S3, but could success
// to retry it.
S3FS_PRN_WARN("Get Object Request(%s) got 500 response code.", pthparam->path.c_str());
// cppcheck-suppress unmatchedSuppression
// cppcheck-suppress knownConditionTrueFalse
}else if(responseCode == S3fsCurl::S3FSCURL_RESPONSECODE_NOTSET){
// This is a case where the processing result has not yet been updated (should be very rare).
S3FS_PRN_WARN("Get Object Request(%s) could not get any response code.", pthparam->path.c_str());
}else{ // including S3fsCurl::S3FSCURL_RESPONSECODE_FATAL_ERROR
// Retry in other case.
S3FS_PRN_WARN("Get Object Request(%s) got fatal response code.", pthparam->path.c_str());
}
}else if(CURLE_OPERATION_TIMEDOUT == curlCode){
S3FS_PRN_ERR("Get Object Request(%s) is timeouted.", pthparam->path.c_str());
isResetOffset= false;
}else if(CURLE_PARTIAL_FILE == curlCode){
S3FS_PRN_WARN("Get Object Request(%s) is recieved data does not match the given size.", pthparam->path.c_str());
isResetOffset= false;
}else{
S3FS_PRN_WARN("Get Object Request(%s) got the result code(%d: %s)", pthparam->path.c_str(), curlCode, curl_easy_strerror(curlCode));
}
// Check retry max count
{
const std::lock_guard<std::mutex> lock(*(pthparam->pthparam_lock));
++(*(pthparam->pretrycount));
if(S3fsCurl::GetRetries() < *(pthparam->pretrycount)){
S3FS_PRN_ERR("Parallel Get Object Request(%s) reached the maximum number of retry count(%d).", pthparam->path.c_str(), *(pthparam->pretrycount));
if(0 == result){
result = -EIO;
}
break;
}
}
// Setup for retry
if(isResetOffset){
S3fsCurl::ResetOffset(&s3fscurl);
}
}
// Set result code
{
const std::lock_guard<std::mutex> lock(*(pthparam->pthparam_lock));
if(0 == *(pthparam->presult) && 0 != result){
// keep first error
*(pthparam->presult) = result;
}
}
return reinterpret_cast<void*>(result);
}
//-------------------------------------------------------------------
// Utility functions
//-------------------------------------------------------------------
@ -883,6 +993,71 @@ int multipart_put_head_request(const std::string& strfrom, const std::string& st
return 0;
}
//
// Calls S3fsCurl::ParallelGetObjectRequest via parallel_get_object_req_threadworker
//
int parallel_get_object_request(const std::string& path, int fd, off_t start, off_t size)
{
S3FS_PRN_INFO3("[path=%s][fd=%d][start=%lld][size=%lld]", path.c_str(), fd, static_cast<long long int>(start), static_cast<long long int>(size));
sse_type_t ssetype = sse_type_t::SSE_DISABLE;
std::string ssevalue;
if(!get_object_sse_type(path.c_str(), ssetype, ssevalue)){
S3FS_PRN_WARN("Failed to get SSE type for file(%s).", path.c_str());
}
Semaphore para_getobj_sem(0);
std::mutex thparam_lock;
int req_count = 0;
int retrycount = 0;
int req_result = 0;
// cycle through open fd, pulling off 10MB chunks at a time
for(off_t remaining_bytes = size, chunk = 0; 0 < remaining_bytes; remaining_bytes -= chunk){
// chunk size
chunk = remaining_bytes > S3fsCurl::GetMultipartSize() ? S3fsCurl::GetMultipartSize() : remaining_bytes;
// parameter for thread worker
auto* thargs = new parallel_get_object_req_thparam; // free in parallel_get_object_req_threadworker
thargs->path = path;
thargs->fd = fd;
thargs->start = (start + size - remaining_bytes);
thargs->size = chunk;
thargs->ssetype = ssetype;
thargs->ssevalue = ssevalue;
thargs->pthparam_lock = &thparam_lock;
thargs->pretrycount = &retrycount;
thargs->presult = &req_result;
// make parameter for thread pool
thpoolman_param ppoolparam;
ppoolparam.args = thargs;
ppoolparam.psem = &para_getobj_sem;
ppoolparam.pfunc = parallel_get_object_req_threadworker;
// setup instruction
if(!ThreadPoolMan::Instruct(ppoolparam)){
S3FS_PRN_ERR("failed setup instruction for one header request.");
delete thargs;
return -EIO;
}
++req_count;
}
// wait for finish all requests
while(req_count > 0){
para_getobj_sem.acquire();
--req_count;
}
// check result
if(0 != req_result){
S3FS_PRN_ERR("error occurred in parallel get object request(errno=%d).", req_result);
return req_result;
}
return 0;
}
//
// Calls S3fsCurl::GetObjectRequest via get_object_req_threadworker
//

View File

@ -161,6 +161,22 @@ struct multipart_put_head_req_thparam
int* presult = nullptr;
};
//
// Parallel Get Object Request parameter structure for Thread Pool.
//
struct parallel_get_object_req_thparam
{
std::string path;
int fd = -1;
off_t start = 0;
off_t size = 0;
sse_type_t ssetype = sse_type_t::SSE_DISABLE;
std::string ssevalue;
std::mutex* pthparam_lock = nullptr;
int* pretrycount = nullptr;
int* presult = nullptr;
};
//
// Get Object Request parameter structure for Thread Pool.
//
@ -187,6 +203,7 @@ void* pre_multipart_upload_req_threadworker(void* arg);
void* complete_multipart_upload_threadworker(void* arg);
void* abort_multipart_upload_req_threadworker(void* arg);
void* multipart_put_head_req_threadworker(void* arg);
void* parallel_get_object_req_threadworker(void* arg);
void* get_object_req_threadworker(void* arg);
//-------------------------------------------------------------------
@ -203,6 +220,7 @@ int pre_multipart_upload_request(const std::string& path, const headers_t& meta,
int complete_multipart_upload_request(const std::string& path, const std::string& upload_id, const etaglist_t& parts);
int abort_multipart_upload_request(const std::string& path, const std::string& upload_id);
int multipart_put_head_request(const std::string& strfrom, const std::string& strto, off_t size, const headers_t& meta);
int parallel_get_object_request(const std::string& path, int fd, off_t start, off_t size);
int get_object_request(const std::string& path, int fd, off_t start, off_t size);
//-------------------------------------------------------------------