Fixed exclusive control of upload_id in PseudoFdInfo class

This commit is contained in:
Takeshi Nakatani 2024-07-15 10:23:53 +00:00 committed by Andrew Gaul
parent 06a3822965
commit 0c26014812
2 changed files with 97 additions and 56 deletions

View File

@ -107,7 +107,7 @@ void* PseudoFdInfo::MultipartUploadThreadWorker(void* arg)
//------------------------------------------------
// PseudoFdInfo methods
//------------------------------------------------
PseudoFdInfo::PseudoFdInfo(int fd, int open_flags) : pseudo_fd(-1), physical_fd(fd), flags(0), upload_fd(-1), uploaded_sem(0), instruct_count(0), completed_count(0), last_result(0)
PseudoFdInfo::PseudoFdInfo(int fd, int open_flags) : pseudo_fd(-1), physical_fd(fd), flags(0), upload_fd(-1), instruct_count(0), completed_count(0), last_result(0), uploaded_sem(0)
{
if(-1 != physical_fd){
pseudo_fd = PseudoFdManager::Get();
@ -146,6 +146,17 @@ bool PseudoFdInfo::Clear()
return true;
}
bool PseudoFdInfo::IsUploadingHasLock() const
{
return !upload_id.empty();
}
bool PseudoFdInfo::IsUploading() const
{
const std::lock_guard<std::mutex> lock(upload_list_lock);
return IsUploadingHasLock();
}
void PseudoFdInfo::CloseUploadFd()
{
const std::lock_guard<std::mutex> lock(upload_list_lock);
@ -157,6 +168,8 @@ void PseudoFdInfo::CloseUploadFd()
bool PseudoFdInfo::OpenUploadFd()
{
const std::lock_guard<std::mutex> lock(upload_list_lock);
if(-1 != upload_fd){
// already initialized
return true;
@ -269,6 +282,12 @@ bool PseudoFdInfo::RowInitialUploadInfo(const std::string& id, bool is_cancel_mp
return true;
}
void PseudoFdInfo::IncreaseInstructionCount()
{
const std::lock_guard<std::mutex> lock(upload_list_lock);
++instruct_count;
}
bool PseudoFdInfo::CompleteInstruction(int result)
{
if(0 != result){
@ -285,25 +304,34 @@ bool PseudoFdInfo::CompleteInstruction(int result)
return true;
}
bool PseudoFdInfo::GetUploadId(std::string& id) const
bool PseudoFdInfo::GetUploadInfo(std::string& id, int& fd) const
{
if(!IsUploading()){
const std::lock_guard<std::mutex> lock(upload_list_lock);
if(!IsUploadingHasLock()){
S3FS_PRN_ERR("Multipart Upload has not started yet.");
return false;
}
id = upload_id;
fd = upload_fd;
return true;
}
bool PseudoFdInfo::GetUploadId(std::string& id) const
{
int fd = -1;
return GetUploadInfo(id, fd);
}
bool PseudoFdInfo::GetEtaglist(etaglist_t& list) const
{
if(!IsUploading()){
const std::lock_guard<std::mutex> lock(upload_list_lock);
if(!IsUploadingHasLock()){
S3FS_PRN_ERR("Multipart Upload has not started yet.");
return false;
}
const std::lock_guard<std::mutex> lock(upload_list_lock);
list.clear();
for(filepart_list_t::const_iterator iter = upload_list.begin(); iter != upload_list.end(); ++iter){
if(iter->petag){
@ -325,12 +353,13 @@ bool PseudoFdInfo::GetEtaglist(etaglist_t& list) const
//
bool PseudoFdInfo::AppendUploadPart(off_t start, off_t size, bool is_copy, etagpair** ppetag)
{
if(!IsUploading()){
const std::lock_guard<std::mutex> lock(upload_list_lock);
if(!IsUploadingHasLock()){
S3FS_PRN_ERR("Multipart Upload has not started yet.");
return false;
}
const std::lock_guard<std::mutex> lock(upload_list_lock);
off_t next_start_pos = 0;
if(!upload_list.empty()){
next_start_pos = upload_list.back().startpos + upload_list.back().size;
@ -365,9 +394,11 @@ static bool filepart_partnum_compare(const filepart& src1, const filepart& src2)
bool PseudoFdInfo::InsertUploadPart(off_t start, off_t size, int part_num, bool is_copy, etagpair** ppetag)
{
const std::lock_guard<std::mutex> lock(upload_list_lock);
//S3FS_PRN_DBG("[start=%lld][size=%lld][part_num=%d][is_copy=%s]", static_cast<long long int>(start), static_cast<long long int>(size), part_num, (is_copy ? "true" : "false"));
if(!IsUploading()){
if(!IsUploadingHasLock()){
S3FS_PRN_ERR("Multipart Upload has not started yet.");
return false;
}
@ -405,6 +436,13 @@ bool PseudoFdInfo::ParallelMultipartUpload(const char* path, const mp_part_list_
return false;
}
// Get upload id/fd before loop
std::string tmp_upload_id;
int tmp_upload_fd = -1;
if(!GetUploadInfo(tmp_upload_id, tmp_upload_fd)){
return false;
}
for(mp_part_list_t::const_iterator iter = mplist.begin(); iter != mplist.end(); ++iter){
// Insert upload part
etagpair* petag = nullptr;
@ -417,8 +455,8 @@ bool PseudoFdInfo::ParallelMultipartUpload(const char* path, const mp_part_list_
pseudofdinfo_thparam* thargs = new pseudofdinfo_thparam;
thargs->ppseudofdinfo = this;
thargs->path = SAFESTRPTR(path);
thargs->upload_id = upload_id;
thargs->upload_fd = upload_fd;
thargs->upload_id = tmp_upload_id;
thargs->upload_fd = tmp_upload_fd;
thargs->start = iter->start;
thargs->size = iter->size;
thargs->is_copy = is_copy;
@ -437,7 +475,8 @@ bool PseudoFdInfo::ParallelMultipartUpload(const char* path, const mp_part_list_
delete thargs;
return false;
}
++instruct_count;
IncreaseInstructionCount();
}
return true;
}
@ -448,16 +487,12 @@ bool PseudoFdInfo::ParallelMultipartUploadAll(const char* path, const mp_part_li
result = 0;
{
const std::lock_guard<std::mutex> lock(upload_list_lock);
if(!OpenUploadFd()){
return false;
}
if(!ParallelMultipartUpload(path, to_upload_list, false) || !ParallelMultipartUpload(path, copy_list, true)){
S3FS_PRN_ERR("Failed setup instruction for uploading(path=%s, to_upload_list=%zu, copy_list=%zu).", SAFESTRPTR(path), to_upload_list.size(), copy_list.size());
return false;
}
if(!OpenUploadFd()){
return false;
}
if(!ParallelMultipartUpload(path, to_upload_list, false) || !ParallelMultipartUpload(path, copy_list, true)){
S3FS_PRN_ERR("Failed setup instruction for uploading(path=%s, to_upload_list=%zu, copy_list=%zu).", SAFESTRPTR(path), to_upload_list.size(), copy_list.size());
return false;
}
// Wait for all thread exiting
@ -577,7 +612,6 @@ ssize_t PseudoFdInfo::UploadBoundaryLastUntreatedArea(const char* path, headers_
//
// Upload Multipart parts
//
const std::lock_guard<std::mutex> lock(upload_list_lock);
if(!ParallelMultipartUpload(path, to_upload_list, false)){
S3FS_PRN_ERR("Failed to upload multipart parts.");
return -EIO;
@ -696,28 +730,32 @@ bool PseudoFdInfo::ExtractUploadPartsFromUntreatedArea(const off_t& untreated_st
// Also, it is assumed that it must not be a copy area.
// So if the areas overlap, include uploaded area as an untreated area.
//
for(filepart_list_t::iterator cur_iter = upload_list.begin(); cur_iter != upload_list.end(); /* ++cur_iter */){
// Check overlap
if((cur_iter->startpos + cur_iter->size - 1) < aligned_start || (aligned_start + aligned_size - 1) < cur_iter->startpos){
// Areas do not overlap
++cur_iter;
{
const std::lock_guard<std::mutex> lock(upload_list_lock);
}else{
// The areas overlap
//
// Since the start position of the uploaded area is aligned with the boundary,
// it is not necessary to check the start position.
// If the uploaded area exceeds the untreated area, expand the untreated area.
//
if((aligned_start + aligned_size - 1) < (cur_iter->startpos + cur_iter->size - 1)){
aligned_size += (cur_iter->startpos + cur_iter->size) - (aligned_start + aligned_size);
for(filepart_list_t::iterator cur_iter = upload_list.begin(); cur_iter != upload_list.end(); /* ++cur_iter */){
// Check overlap
if((cur_iter->startpos + cur_iter->size - 1) < aligned_start || (aligned_start + aligned_size - 1) < cur_iter->startpos){
// Areas do not overlap
++cur_iter;
}else{
// The areas overlap
//
// Since the start position of the uploaded area is aligned with the boundary,
// it is not necessary to check the start position.
// If the uploaded area exceeds the untreated area, expand the untreated area.
//
if((aligned_start + aligned_size - 1) < (cur_iter->startpos + cur_iter->size - 1)){
aligned_size += (cur_iter->startpos + cur_iter->size) - (aligned_start + aligned_size);
}
//
// Add this to cancel list
//
cancel_upload_list.push_back(*cur_iter); // Copy and Push to cancel list
cur_iter = upload_list.erase(cur_iter);
}
//
// Add this to cancel list
//
cancel_upload_list.push_back(*cur_iter); // Copy and Push to cancel list
cur_iter = upload_list.erase(cur_iter);
}
}

View File

@ -62,29 +62,32 @@ class PseudoFdInfo
int pseudo_fd;
int physical_fd;
int flags; // flags at open
std::string upload_id;
int upload_fd; // duplicated fd for uploading
filepart_list_t upload_list;
petagpool etag_entities; // list of etag string and part number entities(to maintain the etag entity even if MPPART_INFO is destroyed)
mutable std::mutex upload_list_lock; // protects upload_id and upload_list
Semaphore uploaded_sem; // use a semaphore to trigger an upload completion like event flag
int instruct_count; // number of instructions for processing by threads
int completed_count; // number of completed processes by thread
int last_result; // the result of thread processing
mutable std::mutex upload_list_lock; // protects upload_id/fd, upload_list, etc.
std::string upload_id GUARDED_BY(upload_list_lock); //
int upload_fd GUARDED_BY(upload_list_lock); // duplicated fd for uploading
filepart_list_t upload_list GUARDED_BY(upload_list_lock);
petagpool etag_entities GUARDED_BY(upload_list_lock); // list of etag string and part number entities(to maintain the etag entity even if MPPART_INFO is destroyed)
int instruct_count GUARDED_BY(upload_list_lock); // number of instructions for processing by threads
int completed_count GUARDED_BY(upload_list_lock); // number of completed processes by thread
int last_result GUARDED_BY(upload_list_lock); // the result of thread processing
Semaphore uploaded_sem; // use a semaphore to trigger an upload completion like event flag
private:
static void* MultipartUploadThreadWorker(void* arg);
bool Clear();
void CloseUploadFd();
bool OpenUploadFd() REQUIRES(upload_list_lock);
bool OpenUploadFd();
bool ResetUploadInfo() REQUIRES(upload_list_lock);
bool RowInitialUploadInfo(const std::string& id, bool is_cancel_mp);
void IncreaseInstructionCount();
bool CompleteInstruction(int result) REQUIRES(upload_list_lock);
bool ParallelMultipartUpload(const char* path, const mp_part_list_t& mplist, bool is_copy) REQUIRES(upload_list_lock);
bool InsertUploadPart(off_t start, off_t size, int part_num, bool is_copy, etagpair** ppetag) REQUIRES(upload_list_lock);
bool GetUploadInfo(std::string& id, int& fd) const;
bool ParallelMultipartUpload(const char* path, const mp_part_list_t& mplist, bool is_copy);
bool InsertUploadPart(off_t start, off_t size, int part_num, bool is_copy, etagpair** ppetag);
bool CancelAllThreads();
bool ExtractUploadPartsFromUntreatedArea(const off_t& untreated_start, const off_t& untreated_size, mp_part_list_t& to_upload_list, filepart_list_t& cancel_upload_list, off_t max_mp_size);
bool IsUploadingHasLock() const REQUIRES(upload_list_lock);
public:
explicit PseudoFdInfo(int fd = -1, int open_flags = 0);
@ -104,7 +107,7 @@ class PseudoFdInfo
bool ClearUploadInfo(bool is_cancel_mp = false);
bool InitialUploadInfo(const std::string& id){ return RowInitialUploadInfo(id, true); }
bool IsUploading() const { return !upload_id.empty(); }
bool IsUploading() const;
bool GetUploadId(std::string& id) const;
bool GetEtaglist(etaglist_t& list) const;