Strictly reviewed the exclusive control of PseudoFdInfo class

This commit is contained in:
Takeshi Nakatani 2022-08-07 10:22:35 +00:00 committed by Andrew Gaul
parent 3e242d0bad
commit f6d7ff1084
2 changed files with 69 additions and 39 deletions

View File

@ -131,6 +131,8 @@ PseudoFdInfo::PseudoFdInfo(int fd, int open_flags) : pseudo_fd(-1), physical_fd(
PseudoFdInfo::~PseudoFdInfo()
{
Clear(); // call before destrying the mutex
if(is_lock_init){
int result;
if(0 != (result = pthread_mutex_destroy(&upload_list_lock))){
@ -139,25 +141,25 @@ PseudoFdInfo::~PseudoFdInfo()
}
is_lock_init = false;
}
Clear();
}
bool PseudoFdInfo::Clear()
{
CancelAllThreads();
CloseUploadFd();
if(-1 != pseudo_fd){
PseudoFdManager::Release(pseudo_fd);
}
pseudo_fd = -1;
physical_fd = -1;
CloseUploadFd(AutoLock::ALREADY_LOCKED); // [NOTE] already destroy mutex, then do not lock it.
return true;
}
void PseudoFdInfo::CloseUploadFd(AutoLock::Type type)
void PseudoFdInfo::CloseUploadFd()
{
AutoLock auto_lock(&upload_list_lock, type);
AutoLock auto_lock(&upload_list_lock);
if(-1 != upload_fd){
close(upload_fd);
@ -227,17 +229,20 @@ bool PseudoFdInfo::Readable() const
return true;
}
bool PseudoFdInfo::ClearUploadInfo(bool is_cancel_mp, AutoLock::Type type)
bool PseudoFdInfo::ClearUploadInfo(bool is_cancel_mp)
{
if(is_cancel_mp){
if(!CancelAllThreads()){
return false;
}
}
return ResetUploadInfo(AutoLock::NONE);
}
bool PseudoFdInfo::ResetUploadInfo(AutoLock::Type type)
{
AutoLock auto_lock(&upload_list_lock, type);
if(is_cancel_mp){
// [TODO]
// If processing for cancellation is required, it will be processed here. Not implemented yet.
//
S3FS_PRN_DBG("If processing for cancellation is required, it will be processed here.");
}
upload_id.erase();
upload_list.clear();
instruct_count = 0;
@ -247,13 +252,24 @@ bool PseudoFdInfo::ClearUploadInfo(bool is_cancel_mp, AutoLock::Type type)
return true;
}
bool PseudoFdInfo::InitialUploadInfo(const std::string& id, AutoLock::Type type)
bool PseudoFdInfo::RowInitialUploadInfo(const std::string& id, bool is_cancel_mp, AutoLock::Type type)
{
AutoLock auto_lock(&upload_list_lock, type);
if(!ClearUploadInfo(true, AutoLock::ALREADY_LOCKED)){
if(is_cancel_mp && AutoLock::ALREADY_LOCKED == type){
S3FS_PRN_ERR("Internal Error: Could not call this with type=AutoLock::ALREADY_LOCKED and is_cancel_mp=true");
return false;
}
if(is_cancel_mp){
if(!ClearUploadInfo(is_cancel_mp)){
return false;
}
}else{
if(!ResetUploadInfo(type)){
return false;
}
}
AutoLock auto_lock(&upload_list_lock, type);
upload_id = id;
return true;
}
@ -553,7 +569,7 @@ ssize_t PseudoFdInfo::UploadBoundaryLastUntreatedArea(const char* path, headers_
S3FS_PRN_ERR("failed to setup multipart upload(create upload id) by errno(%d)", result);
return result;
}
if(!InitialUploadInfo(tmp_upload_id, AutoLock::ALREADY_LOCKED)){
if(!RowInitialUploadInfo(tmp_upload_id, false/* not need to cancel */, AutoLock::ALREADY_LOCKED)){
S3FS_PRN_ERR("failed to setup multipart upload(set upload id to object)");
return result;
}
@ -593,23 +609,17 @@ ssize_t PseudoFdInfo::UploadBoundaryLastUntreatedArea(const char* path, headers_
int PseudoFdInfo::WaitAllThreadsExit()
{
while(true){
{
AutoLock auto_lock(&upload_list_lock);
if(0 == instruct_count && 0 == completed_count){
break;
}
while(uploaded_sem.try_wait()){
if(0 < completed_count){
--completed_count;
}
}
if(0 == instruct_count && 0 == completed_count){
break;
}
int result;
bool is_loop = true;
{
AutoLock auto_lock(&upload_list_lock);
if(0 == instruct_count && 0 == completed_count){
result = last_result;
is_loop = false;
}
}
while(is_loop){
// need to wait the worker exiting
uploaded_sem.wait();
{
@ -617,15 +627,32 @@ int PseudoFdInfo::WaitAllThreadsExit()
if(0 < completed_count){
--completed_count;
}
if(0 == instruct_count && 0 == completed_count){
// break loop
result = last_result;
is_loop = false;
}
}
}
int result;
return result;
}
bool PseudoFdInfo::CancelAllThreads()
{
bool need_cancel = false;
{
AutoLock auto_lock(&upload_list_lock);
result = last_result;
if(0 < instruct_count && 0 < completed_count){
S3FS_PRN_INFO("The upload thread is running, so cancel them and wait for the end.");
need_cancel = true;
last_result = -ECANCELED; // to stop thread running
}
}
return result;
if(need_cancel){
WaitAllThreadsExit();
}
return true;
}
//

View File

@ -76,12 +76,15 @@ class PseudoFdInfo
static void* MultipartUploadThreadWorker(void* arg);
bool Clear();
void CloseUploadFd(AutoLock::Type type = AutoLock::NONE);
void CloseUploadFd();
bool OpenUploadFd(AutoLock::Type type = AutoLock::NONE);
bool ResetUploadInfo(AutoLock::Type type);
bool RowInitialUploadInfo(const std::string& id, bool is_cancel_mp, AutoLock::Type type);
bool CompleteInstruction(int result, AutoLock::Type type = AutoLock::NONE);
bool ParallelMultipartUpload(const char* path, const mp_part_list_t& mplist, bool is_copy, AutoLock::Type type = AutoLock::NONE);
bool InsertUploadPart(off_t start, off_t size, int part_num, bool is_copy, etagpair** ppetag, AutoLock::Type type = AutoLock::NONE);
int WaitAllThreadsExit();
bool CancelAllThreads();
bool ExtractUploadPartsFromUntreatedArea(off_t& untreated_start, off_t& untreated_size, mp_part_list_t& to_upload_list, filepart_list_t& cancel_upload_list, off_t max_mp_size);
public:
@ -95,8 +98,8 @@ class PseudoFdInfo
bool Readable() const;
bool Set(int fd, int open_flags);
bool ClearUploadInfo(bool is_clear_part = false, AutoLock::Type type = AutoLock::NONE);
bool InitialUploadInfo(const std::string& id, AutoLock::Type type = AutoLock::NONE);
bool ClearUploadInfo(bool is_cancel_mp = false);
bool InitialUploadInfo(const std::string& id){ return RowInitialUploadInfo(id, true, AutoLock::NONE); }
bool IsUploading() const { return !upload_id.empty(); }
bool GetUploadId(std::string& id) const;