Fixed data race and memory leaks in PseudoFdInfo

This commit is contained in:
Takeshi Nakatani 2022-07-29 17:27:49 +00:00 committed by Andrew Gaul
parent 07535ec3ec
commit 5a2172dc56
2 changed files with 54 additions and 43 deletions

View File

@ -48,47 +48,38 @@ void* PseudoFdInfo::MultipartUploadThreadWorker(void* arg)
{ {
pseudofdinfo_thparam* pthparam = static_cast<pseudofdinfo_thparam*>(arg); pseudofdinfo_thparam* pthparam = static_cast<pseudofdinfo_thparam*>(arg);
if(!pthparam || !(pthparam->ppseudofdinfo)){ if(!pthparam || !(pthparam->ppseudofdinfo)){
if(pthparam){
delete pthparam;
}
return (void*)(intptr_t)(-EIO); return (void*)(intptr_t)(-EIO);
} }
S3FS_PRN_INFO3("Upload Part Thread [tpath=%s][start=%lld][size=%lld][part=%d]", pthparam->path.c_str(), static_cast<long long>(pthparam->start), static_cast<long long>(pthparam->size), pthparam->part_num); S3FS_PRN_INFO3("Upload Part Thread [tpath=%s][start=%lld][size=%lld][part=%d]", pthparam->path.c_str(), static_cast<long long>(pthparam->start), static_cast<long long>(pthparam->size), pthparam->part_num);
if(0 != pthparam->ppseudofdinfo->last_result){ int result;
S3FS_PRN_DBG("Already occurred error, thus this thread worker is exiting."); S3fsCurl* s3fscurl;
{
AutoLock auto_lock(&(pthparam->ppseudofdinfo->upload_list_lock)); AutoLock auto_lock(&(pthparam->ppseudofdinfo->upload_list_lock));
if(0 < pthparam->ppseudofdinfo->instruct_count){ if(0 != (result = pthparam->ppseudofdinfo->last_result)){
--(pthparam->ppseudofdinfo->instruct_count); S3FS_PRN_DBG("Already occurred error, thus this thread worker is exiting.");
}else{
S3FS_PRN_ERR("Internal error: instruct_count caused an underflow.");
return (void*)(intptr_t)(-EIO);
}
++(pthparam->ppseudofdinfo->completed_count);
return (void*)(intptr_t)(pthparam->ppseudofdinfo->last_result); if(!pthparam->ppseudofdinfo->CompleteInstruction(result, AutoLock::ALREADY_LOCKED)){ // result will be overwritten with the same value.
result = -EIO;
}
delete pthparam;
return (void*)(intptr_t)(result);
}
} }
// setup and make curl object // setup and make curl object
int result = 0;
S3fsCurl* s3fscurl;
if(NULL == (s3fscurl = S3fsCurl::CreateParallelS3fsCurl(pthparam->path.c_str(), pthparam->upload_fd, pthparam->start, pthparam->size, pthparam->part_num, pthparam->is_copy, pthparam->petag, pthparam->upload_id, result))){ if(NULL == (s3fscurl = S3fsCurl::CreateParallelS3fsCurl(pthparam->path.c_str(), pthparam->upload_fd, pthparam->start, pthparam->size, pthparam->part_num, pthparam->is_copy, pthparam->petag, pthparam->upload_id, result))){
S3FS_PRN_ERR("failed creating s3fs curl object for uploading [path=%s][start=%lld][size=%lld][part=%d]", pthparam->path.c_str(), static_cast<long long>(pthparam->start), static_cast<long long>(pthparam->size), pthparam->part_num); S3FS_PRN_ERR("failed creating s3fs curl object for uploading [path=%s][start=%lld][size=%lld][part=%d]", pthparam->path.c_str(), static_cast<long long>(pthparam->start), static_cast<long long>(pthparam->size), pthparam->part_num);
// set result for exiting // set result for exiting
AutoLock auto_lock(&(pthparam->ppseudofdinfo->upload_list_lock)); if(!pthparam->ppseudofdinfo->CompleteInstruction(result, AutoLock::NONE)){
result = -EIO;
if(0 < pthparam->ppseudofdinfo->instruct_count){
--(pthparam->ppseudofdinfo->instruct_count);
}else{
S3FS_PRN_ERR("Internal error: instruct_count caused an underflow.");
return (void*)(intptr_t)(-EIO);
}
++(pthparam->ppseudofdinfo->completed_count);
if(0 != result){
pthparam->ppseudofdinfo->last_result = result;
} }
delete pthparam;
return (void*)(intptr_t)(result); return (void*)(intptr_t)(result);
} }
@ -106,18 +97,11 @@ void* PseudoFdInfo::MultipartUploadThreadWorker(void* arg)
delete s3fscurl; delete s3fscurl;
// set result // set result
{ if(!pthparam->ppseudofdinfo->CompleteInstruction(result, AutoLock::NONE)){
AutoLock auto_lock(&(pthparam->ppseudofdinfo->upload_list_lock)); S3FS_PRN_WARN("This thread worker is about to end, so it doesn't return an EIO here and runs to the end.");
if(0 < pthparam->ppseudofdinfo->instruct_count){
--(pthparam->ppseudofdinfo->instruct_count);
}
++(pthparam->ppseudofdinfo->completed_count);
if(0 != result){
pthparam->ppseudofdinfo->last_result = result;
}
} }
delete pthparam;
return (void*)(intptr_t)(result); return (void*)(intptr_t)(result);
} }
@ -273,6 +257,24 @@ bool PseudoFdInfo::InitialUploadInfo(const std::string& id, AutoLock::Type type)
return true; return true;
} }
bool PseudoFdInfo::CompleteInstruction(int result, AutoLock::Type type)
{
AutoLock auto_lock(&upload_list_lock, type);
if(0 != result){
last_result = result;
}
if(0 >= instruct_count){
S3FS_PRN_ERR("Internal error: instruct_count caused an underflow.");
return false;
}
--instruct_count;
++completed_count;
return true;
}
bool PseudoFdInfo::GetUploadId(std::string& id) const bool PseudoFdInfo::GetUploadId(std::string& id) const
{ {
if(!IsUploading()){ if(!IsUploading()){
@ -385,10 +387,12 @@ bool PseudoFdInfo::InsertUploadPart(off_t start, off_t size, int part_num, bool
// This method only launches the upload thread. // This method only launches the upload thread.
// Check the maximum number of threads before calling. // Check the maximum number of threads before calling.
// //
bool PseudoFdInfo::ParallelMultipartUpload(const char* path, const mp_part_list_t& mplist, bool is_copy) bool PseudoFdInfo::ParallelMultipartUpload(const char* path, const mp_part_list_t& mplist, bool is_copy, AutoLock::Type type)
{ {
//S3FS_PRN_DBG("[path=%s][mplist(%zu)]", SAFESTRPTR(path), mplist.size()); //S3FS_PRN_DBG("[path=%s][mplist(%zu)]", SAFESTRPTR(path), mplist.size());
AutoLock auto_lock(&upload_list_lock, type);
if(mplist.empty()){ if(mplist.empty()){
// nothing to do // nothing to do
return true; return true;
@ -441,11 +445,11 @@ bool PseudoFdInfo::ParallelMultipartUploadAll(const char* path, const mp_part_li
result = 0; result = 0;
if(!OpenUploadFd(AutoLock::ALREADY_LOCKED)){ if(!OpenUploadFd(AutoLock::NONE)){
return false; return false;
} }
if(!ParallelMultipartUpload(path, upload_list, false) || !ParallelMultipartUpload(path, copy_list, true)){ if(!ParallelMultipartUpload(path, upload_list, false, AutoLock::NONE) || !ParallelMultipartUpload(path, copy_list, true, AutoLock::NONE)){
S3FS_PRN_ERR("Failed setup instruction for uploading(path=%s, upload_list=%zu, copy_list=%zu).", SAFESTRPTR(path), upload_list.size(), copy_list.size()); S3FS_PRN_ERR("Failed setup instruction for uploading(path=%s, upload_list=%zu, copy_list=%zu).", SAFESTRPTR(path), upload_list.size(), copy_list.size());
return false; return false;
} }
@ -568,7 +572,7 @@ ssize_t PseudoFdInfo::UploadBoundaryLastUntreatedArea(const char* path, headers_
// //
// Upload Multipart parts // Upload Multipart parts
// //
if(!ParallelMultipartUpload(path, to_upload_list, false)){ if(!ParallelMultipartUpload(path, to_upload_list, false, AutoLock::ALREADY_LOCKED)){
S3FS_PRN_ERR("Failed to upload multipart parts."); S3FS_PRN_ERR("Failed to upload multipart parts.");
return -EIO; return -EIO;
} }
@ -614,7 +618,13 @@ int PseudoFdInfo::WaitAllThreadsExit()
} }
} }
} }
return last_result;
int result;
{
AutoLock auto_lock(&upload_list_lock);
result = last_result;
}
return result;
} }
// //

View File

@ -77,7 +77,8 @@ class PseudoFdInfo
bool Clear(); bool Clear();
void CloseUploadFd(AutoLock::Type type = AutoLock::NONE); void CloseUploadFd(AutoLock::Type type = AutoLock::NONE);
bool OpenUploadFd(AutoLock::Type type = AutoLock::NONE); bool OpenUploadFd(AutoLock::Type type = AutoLock::NONE);
bool ParallelMultipartUpload(const char* path, const mp_part_list_t& mplist, bool is_copy); bool CompleteInstruction(int result, AutoLock::Type type = AutoLock::NONE);
bool ParallelMultipartUpload(const char* path, const mp_part_list_t& mplist, bool is_copy, AutoLock::Type type = AutoLock::NONE);
bool InsertUploadPart(off_t start, off_t size, int part_num, bool is_copy, etagpair** ppetag, AutoLock::Type type = AutoLock::NONE); bool InsertUploadPart(off_t start, off_t size, int part_num, bool is_copy, etagpair** ppetag, AutoLock::Type type = AutoLock::NONE);
int WaitAllThreadsExit(); int WaitAllThreadsExit();
bool ExtractUploadPartsFromUntreatedArea(off_t& untreated_start, off_t& untreated_size, mp_part_list_t& to_upload_list, filepart_list_t& cancel_upload_list, off_t max_mp_size); bool ExtractUploadPartsFromUntreatedArea(off_t& untreated_start, off_t& untreated_size, mp_part_list_t& to_upload_list, filepart_list_t& cancel_upload_list, off_t max_mp_size);