Fixed bugs about stream upload

This commit is contained in:
Takeshi Nakatani 2022-07-18 16:03:38 +00:00 committed by Andrew Gaul
parent 136c5ec653
commit 22f2392fca
5 changed files with 124 additions and 121 deletions

View File

@ -664,7 +664,7 @@ int FdEntity::Open(const headers_t* pmeta, off_t size, time_t time, int flags, A
// if there is untreated area, set it to pseudo object. // if there is untreated area, set it to pseudo object.
if(0 < truncated_size){ if(0 < truncated_size){
if(!ppseudoinfo->AddUntreated(truncated_start, truncated_size)){ if(!AddUntreated(truncated_start, truncated_size)){
pseudo_fd_map.erase(pseudo_fd); pseudo_fd_map.erase(pseudo_fd);
if(pfile){ if(pfile){
fclose(pfile); fclose(pfile);
@ -1358,7 +1358,7 @@ int FdEntity::NoCacheCompleteMultipartPost(PseudoFdInfo* pseudo_obj)
s3fscurl.DestroyCurlHandle(); s3fscurl.DestroyCurlHandle();
// clear multipart upload info // clear multipart upload info
pseudo_obj->ClearUntreated(); untreated_list.ClearAll();
pseudo_obj->ClearUploadInfo(); pseudo_obj->ClearUploadInfo();
return 0; return 0;
@ -1491,7 +1491,7 @@ int FdEntity::RowFlushNoMultipart(PseudoFdInfo* pseudo_obj, const char* tpath)
// reset uploaded file size // reset uploaded file size
size_orgmeta = st.st_size; size_orgmeta = st.st_size;
pseudo_obj->ClearUntreated(); untreated_list.ClearAll();
if(0 == result){ if(0 == result){
pagelist.ClearAllModified(); pagelist.ClearAllModified();
@ -1568,7 +1568,7 @@ int FdEntity::RowFlushMultipart(PseudoFdInfo* pseudo_obj, const char* tpath)
// reset uploaded file size // reset uploaded file size
size_orgmeta = st.st_size; size_orgmeta = st.st_size;
} }
pseudo_obj->ClearUntreated(); untreated_list.ClearAll();
}else{ }else{
// Already start uploading // Already start uploading
@ -1576,12 +1576,12 @@ int FdEntity::RowFlushMultipart(PseudoFdInfo* pseudo_obj, const char* tpath)
// upload rest data // upload rest data
off_t untreated_start = 0; off_t untreated_start = 0;
off_t untreated_size = 0; off_t untreated_size = 0;
if(pseudo_obj->GetLastUntreated(untreated_start, untreated_size, S3fsCurl::GetMultipartSize(), 0) && 0 < untreated_size){ if(untreated_list.GetLastUpdatedPart(untreated_start, untreated_size, S3fsCurl::GetMultipartSize(), 0) && 0 < untreated_size){
if(0 != (result = NoCacheMultipartPost(pseudo_obj, physical_fd, untreated_start, untreated_size))){ if(0 != (result = NoCacheMultipartPost(pseudo_obj, physical_fd, untreated_start, untreated_size))){
S3FS_PRN_ERR("failed to multipart post(start=%lld, size=%lld) for file(physical_fd=%d).", static_cast<long long int>(untreated_start), static_cast<long long int>(untreated_size), physical_fd); S3FS_PRN_ERR("failed to multipart post(start=%lld, size=%lld) for file(physical_fd=%d).", static_cast<long long int>(untreated_start), static_cast<long long int>(untreated_size), physical_fd);
return result; return result;
} }
pseudo_obj->ClearUntreated(untreated_start, untreated_size); untreated_list.ClearParts(untreated_start, untreated_size);
} }
// complete multipart uploading. // complete multipart uploading.
if(0 != (result = NoCacheCompleteMultipartPost(pseudo_obj))){ if(0 != (result = NoCacheCompleteMultipartPost(pseudo_obj))){
@ -1696,7 +1696,7 @@ int FdEntity::RowFlushMixMultipart(PseudoFdInfo* pseudo_obj, const char* tpath)
// reset uploaded file size // reset uploaded file size
size_orgmeta = st.st_size; size_orgmeta = st.st_size;
} }
pseudo_obj->ClearUntreated(); untreated_list.ClearAll();
}else{ }else{
// Already start uploading // Already start uploading
@ -1704,12 +1704,12 @@ int FdEntity::RowFlushMixMultipart(PseudoFdInfo* pseudo_obj, const char* tpath)
// upload rest data // upload rest data
off_t untreated_start = 0; off_t untreated_start = 0;
off_t untreated_size = 0; off_t untreated_size = 0;
if(pseudo_obj->GetLastUntreated(untreated_start, untreated_size, S3fsCurl::GetMultipartSize(), 0) && 0 < untreated_size){ if(untreated_list.GetLastUpdatedPart(untreated_start, untreated_size, S3fsCurl::GetMultipartSize(), 0) && 0 < untreated_size){
if(0 != (result = NoCacheMultipartPost(pseudo_obj, physical_fd, untreated_start, untreated_size))){ if(0 != (result = NoCacheMultipartPost(pseudo_obj, physical_fd, untreated_start, untreated_size))){
S3FS_PRN_ERR("failed to multipart post(start=%lld, size=%lld) for file(physical_fd=%d).", static_cast<long long int>(untreated_start), static_cast<long long int>(untreated_size), physical_fd); S3FS_PRN_ERR("failed to multipart post(start=%lld, size=%lld) for file(physical_fd=%d).", static_cast<long long int>(untreated_start), static_cast<long long int>(untreated_size), physical_fd);
return result; return result;
} }
pseudo_obj->ClearUntreated(untreated_start, untreated_size); untreated_list.ClearParts(untreated_start, untreated_size);
} }
// complete multipart uploading. // complete multipart uploading.
if(0 != (result = NoCacheCompleteMultipartPost(pseudo_obj))){ if(0 != (result = NoCacheCompleteMultipartPost(pseudo_obj))){
@ -1771,7 +1771,7 @@ int FdEntity::RowFlushStreamMultipart(PseudoFdInfo* pseudo_obj, const char* tpat
// reset uploaded file size // reset uploaded file size
size_orgmeta = st.st_size; size_orgmeta = st.st_size;
pseudo_obj->ClearUntreated(); untreated_list.ClearAll();
if(0 == result){ if(0 == result){
pagelist.ClearAllModified(); pagelist.ClearAllModified();
@ -1785,7 +1785,7 @@ int FdEntity::RowFlushStreamMultipart(PseudoFdInfo* pseudo_obj, const char* tpat
mp_part_list_t to_copy_list; mp_part_list_t to_copy_list;
mp_part_list_t to_download_list; mp_part_list_t to_download_list;
filepart_list_t cancel_uploaded_list; filepart_list_t cancel_uploaded_list;
if(!pseudo_obj->ExtractUploadPartsFromAllArea(to_upload_list, to_copy_list, to_download_list, cancel_uploaded_list, S3fsCurl::GetMultipartSize(), pagelist.Size(), FdEntity::mixmultipart)){ if(!pseudo_obj->ExtractUploadPartsFromAllArea(untreated_list, to_upload_list, to_copy_list, to_download_list, cancel_uploaded_list, S3fsCurl::GetMultipartSize(), pagelist.Size(), FdEntity::mixmultipart)){
S3FS_PRN_ERR("Failed to extract various upload parts list from all area: errno(EIO)"); S3FS_PRN_ERR("Failed to extract various upload parts list from all area: errno(EIO)");
return -EIO; return -EIO;
} }
@ -1866,13 +1866,13 @@ int FdEntity::RowFlushStreamMultipart(PseudoFdInfo* pseudo_obj, const char* tpat
// //
if(!pseudo_obj->ParallelMultipartUploadAll(path.c_str(), to_upload_list, to_copy_list, result)){ if(!pseudo_obj->ParallelMultipartUploadAll(path.c_str(), to_upload_list, to_copy_list, result)){
S3FS_PRN_ERR("Failed to upload multipart parts."); S3FS_PRN_ERR("Failed to upload multipart parts.");
pseudo_obj->ClearUntreated(); untreated_list.ClearAll();
pseudo_obj->ClearUploadInfo(); // clear multipart upload info pseudo_obj->ClearUploadInfo(); // clear multipart upload info
return -EIO; return -EIO;
} }
if(0 != result){ if(0 != result){
S3FS_PRN_ERR("An error(%d) occurred in some threads that were uploading parallel multiparts, but continue to clean up..", result); S3FS_PRN_ERR("An error(%d) occurred in some threads that were uploading parallel multiparts, but continue to clean up..", result);
pseudo_obj->ClearUntreated(); untreated_list.ClearAll();
pseudo_obj->ClearUploadInfo(); // clear multipart upload info pseudo_obj->ClearUploadInfo(); // clear multipart upload info
return result; return result;
} }
@ -1884,20 +1884,20 @@ int FdEntity::RowFlushStreamMultipart(PseudoFdInfo* pseudo_obj, const char* tpat
etaglist_t etaglist; etaglist_t etaglist;
if(!pseudo_obj->GetUploadId(upload_id) || !pseudo_obj->GetEtaglist(etaglist)){ if(!pseudo_obj->GetUploadId(upload_id) || !pseudo_obj->GetEtaglist(etaglist)){
S3FS_PRN_ERR("There is no upload id or etag list."); S3FS_PRN_ERR("There is no upload id or etag list.");
pseudo_obj->ClearUntreated(); untreated_list.ClearAll();
pseudo_obj->ClearUploadInfo(); // clear multipart upload info pseudo_obj->ClearUploadInfo(); // clear multipart upload info
return -EIO; return -EIO;
}else{ }else{
S3fsCurl s3fscurl(true); S3fsCurl s3fscurl(true);
if(0 != (result = s3fscurl.CompleteMultipartPostRequest(path.c_str(), upload_id, etaglist))){ if(0 != (result = s3fscurl.CompleteMultipartPostRequest(path.c_str(), upload_id, etaglist))){
S3FS_PRN_ERR("failed to complete multipart upload by errno(%d)", result); S3FS_PRN_ERR("failed to complete multipart upload by errno(%d)", result);
pseudo_obj->ClearUntreated(); untreated_list.ClearAll();
pseudo_obj->ClearUploadInfo(); // clear multipart upload info pseudo_obj->ClearUploadInfo(); // clear multipart upload info
return result; return result;
} }
s3fscurl.DestroyCurlHandle(); s3fscurl.DestroyCurlHandle();
} }
pseudo_obj->ClearUntreated(); untreated_list.ClearAll();
pseudo_obj->ClearUploadInfo(); // clear multipart upload info pseudo_obj->ClearUploadInfo(); // clear multipart upload info
// put pending headers or create new file // put pending headers or create new file
@ -1905,7 +1905,7 @@ int FdEntity::RowFlushStreamMultipart(PseudoFdInfo* pseudo_obj, const char* tpat
return result; return result;
} }
} }
pseudo_obj->ClearUntreated(); untreated_list.ClearAll();
if(0 == result){ if(0 == result){
pagelist.ClearAllModified(); pagelist.ClearAllModified();
@ -2028,7 +2028,7 @@ ssize_t FdEntity::Write(int fd, const char* bytes, off_t start, size_t size)
return -errno; return -errno;
} }
// set untreated area // set untreated area
if(!pseudo_obj->AddUntreated(pagelist.Size(), (start - pagelist.Size()))){ if(!AddUntreated(pagelist.Size(), (start - pagelist.Size()))){
S3FS_PRN_ERR("failed to set untreated area by incremental."); S3FS_PRN_ERR("failed to set untreated area by incremental.");
return -EIO; return -EIO;
} }
@ -2101,7 +2101,7 @@ ssize_t FdEntity::WriteNoMultipart(PseudoFdInfo* pseudo_obj, const char* bytes,
} }
if(0 < wsize){ if(0 < wsize){
pagelist.SetPageLoadedStatus(start, wsize, PageList::PAGE_LOAD_MODIFIED); pagelist.SetPageLoadedStatus(start, wsize, PageList::PAGE_LOAD_MODIFIED);
pseudo_obj->AddUntreated(start, wsize); AddUntreated(start, wsize);
} }
// Load uninitialized area which starts from (start + size) to EOF after writing. // Load uninitialized area which starts from (start + size) to EOF after writing.
@ -2161,8 +2161,7 @@ ssize_t FdEntity::WriteMultipart(PseudoFdInfo* pseudo_obj, const char* bytes, of
S3FS_PRN_ERR("failed to load uninitialized area and multipart uploading it(errno=%d)", result); S3FS_PRN_ERR("failed to load uninitialized area and multipart uploading it(errno=%d)", result);
return result; return result;
} }
untreated_list.ClearAll();
pseudo_obj->ClearUntreated();
} }
}else{ }else{
// already start multipart uploading // already start multipart uploading
@ -2176,7 +2175,7 @@ ssize_t FdEntity::WriteMultipart(PseudoFdInfo* pseudo_obj, const char* bytes, of
} }
if(0 < wsize){ if(0 < wsize){
pagelist.SetPageLoadedStatus(start, wsize, PageList::PAGE_LOAD_MODIFIED); pagelist.SetPageLoadedStatus(start, wsize, PageList::PAGE_LOAD_MODIFIED);
pseudo_obj->AddUntreated(start, wsize); AddUntreated(start, wsize);
} }
// Load uninitialized area which starts from (start + size) to EOF after writing. // Load uninitialized area which starts from (start + size) to EOF after writing.
@ -2193,7 +2192,7 @@ ssize_t FdEntity::WriteMultipart(PseudoFdInfo* pseudo_obj, const char* bytes, of
// get last untreated part(maximum size is multipart size) // get last untreated part(maximum size is multipart size)
off_t untreated_start = 0; off_t untreated_start = 0;
off_t untreated_size = 0; off_t untreated_size = 0;
if(pseudo_obj->GetLastUntreated(untreated_start, untreated_size, S3fsCurl::GetMultipartSize())){ if(untreated_list.GetLastUpdatedPart(untreated_start, untreated_size, S3fsCurl::GetMultipartSize())){
// when multipart max size is reached // when multipart max size is reached
if(0 != (result = NoCacheMultipartPost(pseudo_obj, physical_fd, untreated_start, untreated_size))){ if(0 != (result = NoCacheMultipartPost(pseudo_obj, physical_fd, untreated_start, untreated_size))){
S3FS_PRN_ERR("failed to multipart post(start=%lld, size=%lld) for file(physical_fd=%d).", static_cast<long long int>(untreated_start), static_cast<long long int>(untreated_size), physical_fd); S3FS_PRN_ERR("failed to multipart post(start=%lld, size=%lld) for file(physical_fd=%d).", static_cast<long long int>(untreated_start), static_cast<long long int>(untreated_size), physical_fd);
@ -2208,7 +2207,7 @@ ssize_t FdEntity::WriteMultipart(PseudoFdInfo* pseudo_obj, const char* bytes, of
S3FS_PRN_ERR("failed to truncate file(physical_fd=%d).", physical_fd); S3FS_PRN_ERR("failed to truncate file(physical_fd=%d).", physical_fd);
return -errno; return -errno;
} }
pseudo_obj->ClearUntreated(untreated_start, untreated_size); untreated_list.ClearParts(untreated_start, untreated_size);
} }
} }
return wsize; return wsize;
@ -2249,8 +2248,7 @@ ssize_t FdEntity::WriteMixMultipart(PseudoFdInfo* pseudo_obj, const char* bytes,
S3FS_PRN_ERR("failed to load uninitialized area and multipart uploading it(errno=%d)", result); S3FS_PRN_ERR("failed to load uninitialized area and multipart uploading it(errno=%d)", result);
return result; return result;
} }
untreated_list.ClearAll();
pseudo_obj->ClearUntreated();
} }
}else{ }else{
// already start multipart uploading // already start multipart uploading
@ -2264,7 +2262,7 @@ ssize_t FdEntity::WriteMixMultipart(PseudoFdInfo* pseudo_obj, const char* bytes,
} }
if(0 < wsize){ if(0 < wsize){
pagelist.SetPageLoadedStatus(start, wsize, PageList::PAGE_LOAD_MODIFIED); pagelist.SetPageLoadedStatus(start, wsize, PageList::PAGE_LOAD_MODIFIED);
pseudo_obj->AddUntreated(start, wsize); AddUntreated(start, wsize);
} }
// check multipart uploading // check multipart uploading
@ -2272,7 +2270,7 @@ ssize_t FdEntity::WriteMixMultipart(PseudoFdInfo* pseudo_obj, const char* bytes,
// get last untreated part(maximum size is multipart size) // get last untreated part(maximum size is multipart size)
off_t untreated_start = 0; off_t untreated_start = 0;
off_t untreated_size = 0; off_t untreated_size = 0;
if(pseudo_obj->GetLastUntreated(untreated_start, untreated_size, S3fsCurl::GetMultipartSize())){ if(untreated_list.GetLastUpdatedPart(untreated_start, untreated_size, S3fsCurl::GetMultipartSize())){
// when multipart max size is reached // when multipart max size is reached
if(0 != (result = NoCacheMultipartPost(pseudo_obj, physical_fd, untreated_start, untreated_size))){ if(0 != (result = NoCacheMultipartPost(pseudo_obj, physical_fd, untreated_start, untreated_size))){
S3FS_PRN_ERR("failed to multipart post(start=%lld, size=%lld) for file(physical_fd=%d).", static_cast<long long int>(untreated_start), static_cast<long long int>(untreated_size), physical_fd); S3FS_PRN_ERR("failed to multipart post(start=%lld, size=%lld) for file(physical_fd=%d).", static_cast<long long int>(untreated_start), static_cast<long long int>(untreated_size), physical_fd);
@ -2287,7 +2285,7 @@ ssize_t FdEntity::WriteMixMultipart(PseudoFdInfo* pseudo_obj, const char* bytes,
S3FS_PRN_ERR("failed to truncate file(physical_fd=%d).", physical_fd); S3FS_PRN_ERR("failed to truncate file(physical_fd=%d).", physical_fd);
return -errno; return -errno;
} }
pseudo_obj->ClearUntreated(untreated_start, untreated_size); untreated_list.ClearParts(untreated_start, untreated_size);
} }
} }
return wsize; return wsize;
@ -2317,7 +2315,7 @@ ssize_t FdEntity::WriteStreamUpload(PseudoFdInfo* pseudo_obj, const char* bytes,
} }
if(0 < wsize){ if(0 < wsize){
pagelist.SetPageLoadedStatus(start, wsize, PageList::PAGE_LOAD_MODIFIED); pagelist.SetPageLoadedStatus(start, wsize, PageList::PAGE_LOAD_MODIFIED);
pseudo_obj->AddUntreated(start, wsize); AddUntreated(start, wsize);
} }
// Check and Upload // Check and Upload
@ -2328,7 +2326,7 @@ ssize_t FdEntity::WriteStreamUpload(PseudoFdInfo* pseudo_obj, const char* bytes,
headers_t tmporgmeta = orgmeta; headers_t tmporgmeta = orgmeta;
bool isuploading = pseudo_obj->IsUploading(); bool isuploading = pseudo_obj->IsUploading();
int result; int result;
if(0 != (result = pseudo_obj->UploadBoundaryLastUntreatedArea(path.c_str(), tmporgmeta))){ if(0 != (result = pseudo_obj->UploadBoundaryLastUntreatedArea(path.c_str(), tmporgmeta, this))){
S3FS_PRN_ERR("Failed to upload the last untreated parts(area) : result=%d", result); S3FS_PRN_ERR("Failed to upload the last untreated parts(area) : result=%d", result);
return result; return result;
} }
@ -2505,6 +2503,46 @@ void FdEntity::MarkDirtyNewFile()
pending_status = CREATE_FILE_PENDING; pending_status = CREATE_FILE_PENDING;
} }
bool FdEntity::AddUntreated(off_t start, off_t size)
{
bool result = untreated_list.AddPart(start, size);
if(!result){
S3FS_PRN_DBG("Failed adding untreated area part.");
}else if(S3fsLog::IsS3fsLogDbg()){
untreated_list.Dump();
}
return result;
}
bool FdEntity::GetLastUpdateUntreatedPart(off_t& start, off_t& size)
{
// Get last untreated area
if(!untreated_list.GetLastUpdatePart(start, size)){
return false;
}
return true;
}
bool FdEntity::ReplaceLastUpdateUntreatedPart(off_t front_start, off_t front_size, off_t behind_start, off_t behind_size)
{
if(0 < front_size){
if(!untreated_list.ReplaceLastUpdatePart(front_start, front_size)){
return false;
}
}else{
if(!untreated_list.RemoveLastUpdatePart()){
return false;
}
}
if(0 < behind_size){
if(!untreated_list.AddPart(behind_start, behind_size)){
return false;
}
}
return true;
}
/* /*
* Local variables: * Local variables:
* tab-width: 4 * tab-width: 4

View File

@ -24,6 +24,7 @@
#include "autolock.h" #include "autolock.h"
#include "fdcache_page.h" #include "fdcache_page.h"
#include "fdcache_fdinfo.h" #include "fdcache_fdinfo.h"
#include "fdcache_untreated.h"
#include "metaheader.h" #include "metaheader.h"
//------------------------------------------------ //------------------------------------------------
@ -50,6 +51,7 @@ class FdEntity
bool is_lock_init; bool is_lock_init;
std::string path; // object path std::string path; // object path
int physical_fd; // physical file(cache or temporary file) descriptor int physical_fd; // physical file(cache or temporary file) descriptor
UntreatedParts untreated_list; // list of untreated parts that have been written and not yet uploaded(for streamupload)
fdinfo_map_t pseudo_fd_map; // pseudo file descriptor information map fdinfo_map_t pseudo_fd_map; // pseudo file descriptor information map
FILE* pfile; // file pointer(tmp file or cache file) FILE* pfile; // file pointer(tmp file or cache file)
ino_t inode; // inode number for cache file ino_t inode; // inode number for cache file
@ -88,6 +90,8 @@ class FdEntity
ssize_t WriteMixMultipart(PseudoFdInfo* pseudo_obj, const char* bytes, off_t start, size_t size); ssize_t WriteMixMultipart(PseudoFdInfo* pseudo_obj, const char* bytes, off_t start, size_t size);
ssize_t WriteStreamUpload(PseudoFdInfo* pseudo_obj, const char* bytes, off_t start, size_t size); ssize_t WriteStreamUpload(PseudoFdInfo* pseudo_obj, const char* bytes, off_t start, size_t size);
bool AddUntreated(off_t start, off_t size);
public: public:
static bool GetNoMixMultipart() { return mixmultipart; } static bool GetNoMixMultipart() { return mixmultipart; }
static bool SetNoMixMultipart(); static bool SetNoMixMultipart();
@ -143,6 +147,9 @@ class FdEntity
bool PunchHole(off_t start = 0, size_t size = 0); bool PunchHole(off_t start = 0, size_t size = 0);
void MarkDirtyNewFile(); void MarkDirtyNewFile();
bool GetLastUpdateUntreatedPart(off_t& start, off_t& size);
bool ReplaceLastUpdateUntreatedPart(off_t front_start, off_t front_size, off_t behind_start, off_t behind_size);
}; };
typedef std::map<std::string, class FdEntity*> fdent_map_t; // key=path, value=FdEntity* typedef std::map<std::string, class FdEntity*> fdent_map_t; // key=path, value=FdEntity*

View File

@ -30,6 +30,7 @@
#include "s3fs.h" #include "s3fs.h"
#include "fdcache_fdinfo.h" #include "fdcache_fdinfo.h"
#include "fdcache_pseudofd.h" #include "fdcache_pseudofd.h"
#include "fdcache_entity.h"
#include "curl.h" #include "curl.h"
#include "string_util.h" #include "string_util.h"
#include "threadpoolman.h" #include "threadpoolman.h"
@ -334,82 +335,18 @@ bool PseudoFdInfo::AppendUploadPart(off_t start, off_t size, bool is_copy, etagp
int partnumber = static_cast<int>(upload_list.size()) + 1; int partnumber = static_cast<int>(upload_list.size()) + 1;
// add new part // add new part
etag_entities.push_back(etagpair(NULL, partnumber)); // [NOTE] Create the etag entity and register it in the list. etagpair* petag_entity = etag_entities.add(etagpair(NULL, partnumber)); // [NOTE] Create the etag entity and register it in the list.
etagpair& etag_entity = etag_entities.back(); filepart newpart(false, physical_fd, start, size, is_copy, petag_entity);
filepart newpart(false, physical_fd, start, size, is_copy, &etag_entity);
upload_list.push_back(newpart); upload_list.push_back(newpart);
// set etag pointer // set etag pointer
if(ppetag){ if(ppetag){
*ppetag = &etag_entity; *ppetag = petag_entity;
} }
return true; return true;
} }
void PseudoFdInfo::ClearUntreated(AutoLock::Type type)
{
AutoLock auto_lock(&upload_list_lock, type);
untreated_list.ClearAll();
}
bool PseudoFdInfo::ClearUntreated(off_t start, off_t size)
{
AutoLock auto_lock(&upload_list_lock);
return untreated_list.ClearParts(start, size);
}
bool PseudoFdInfo::GetLastUntreated(off_t& start, off_t& size, off_t max_size, off_t min_size)
{
AutoLock auto_lock(&upload_list_lock);
return untreated_list.GetLastUpdatedPart(start, size, max_size, min_size);
}
bool PseudoFdInfo::AddUntreated(off_t start, off_t size)
{
AutoLock auto_lock(&upload_list_lock);
bool result = untreated_list.AddPart(start, size);
if(!result){
S3FS_PRN_DBG("Failed adding untreated area part.");
}else if(S3fsLog::IsS3fsLogDbg()){
untreated_list.Dump();
}
return result;
}
bool PseudoFdInfo::GetLastUpdateUntreatedPart(off_t& start, off_t& size)
{
// Get last untreated area
if(!untreated_list.GetLastUpdatePart(start, size)){
return false;
}
return true;
}
bool PseudoFdInfo::ReplaceLastUpdateUntreatedPart(off_t front_start, off_t front_size, off_t behind_start, off_t behind_size)
{
if(0 < front_size){
if(!untreated_list.ReplaceLastUpdatePart(front_start, front_size)){
return false;
}
}else{
if(!untreated_list.RemoveLastUpdatePart()){
return false;
}
}
if(0 < behind_size){
if(!untreated_list.AddPart(behind_start, behind_size)){
return false;
}
}
return true;
}
// //
// Utility for sorting upload list // Utility for sorting upload list
// //
@ -434,16 +371,15 @@ bool PseudoFdInfo::InsertUploadPart(off_t start, off_t size, int part_num, bool
AutoLock auto_lock(&upload_list_lock, type); AutoLock auto_lock(&upload_list_lock, type);
// insert new part // insert new part
etag_entities.push_back(etagpair(NULL, part_num)); etagpair* petag_entity = etag_entities.add(etagpair(NULL, part_num));
etagpair& etag_entity = etag_entities.back(); filepart newpart(false, physical_fd, start, size, is_copy, petag_entity);
filepart newpart(false, physical_fd, start, size, is_copy, &etag_entity);
upload_list.push_back(newpart); upload_list.push_back(newpart);
// sort by part number // sort by part number
upload_list.sort(filepart_partnum_compare); upload_list.sort(filepart_partnum_compare);
// set etag pointer // set etag pointer
*ppetag = &etag_entity; *ppetag = petag_entity;
return true; return true;
} }
@ -540,12 +476,12 @@ bool PseudoFdInfo::ParallelMultipartUploadAll(const char* path, const mp_part_li
// alignment(to backward), and if that gap area is remained, that area is // alignment(to backward), and if that gap area is remained, that area is
// rest to untreated area. // rest to untreated area.
// //
ssize_t PseudoFdInfo::UploadBoundaryLastUntreatedArea(const char* path, headers_t& meta) ssize_t PseudoFdInfo::UploadBoundaryLastUntreatedArea(const char* path, headers_t& meta, FdEntity* pfdent)
{ {
S3FS_PRN_DBG("[path=%s][pseudo_fd=%d][physical_fd=%d]", SAFESTRPTR(path), pseudo_fd, physical_fd); S3FS_PRN_DBG("[path=%s][pseudo_fd=%d][physical_fd=%d]", SAFESTRPTR(path), pseudo_fd, physical_fd);
if(!path || -1 == physical_fd || -1 == pseudo_fd){ if(!path || -1 == physical_fd || -1 == pseudo_fd || !pfdent){
S3FS_PRN_ERR("pseudo_fd(%d) to physical_fd(%d) for path(%s) is not opened or not writable", pseudo_fd, physical_fd, path); S3FS_PRN_ERR("pseudo_fd(%d) to physical_fd(%d) for path(%s) is not opened or not writable, or pfdent is NULL.", pseudo_fd, physical_fd, path);
return -EBADF; return -EBADF;
} }
AutoLock auto_lock(&upload_list_lock); AutoLock auto_lock(&upload_list_lock);
@ -555,7 +491,7 @@ ssize_t PseudoFdInfo::UploadBoundaryLastUntreatedArea(const char* path, headers_
// //
off_t last_untreated_start = 0; off_t last_untreated_start = 0;
off_t last_untreated_size = 0; off_t last_untreated_size = 0;
if(!GetLastUpdateUntreatedPart(last_untreated_start, last_untreated_size) || last_untreated_start < 0 || last_untreated_size <= 0){ if(!pfdent->GetLastUpdateUntreatedPart(last_untreated_start, last_untreated_size) || last_untreated_start < 0 || last_untreated_size <= 0){
S3FS_PRN_WARN("Not found last update untreated area or it is empty, thus return without any error."); S3FS_PRN_WARN("Not found last update untreated area or it is empty, thus return without any error.");
return 0; return 0;
} }
@ -646,7 +582,7 @@ ssize_t PseudoFdInfo::UploadBoundaryLastUntreatedArea(const char* path, headers_
off_t behind_rem_start = aligned_start + aligned_size; off_t behind_rem_start = aligned_start + aligned_size;
off_t behind_rem_size = (last_untreated_start + last_untreated_size) - behind_rem_start; off_t behind_rem_size = (last_untreated_start + last_untreated_size) - behind_rem_start;
if(!ReplaceLastUpdateUntreatedPart(front_rem_start, front_rem_size, behind_rem_start, behind_rem_size)){ if(!pfdent->ReplaceLastUpdateUntreatedPart(front_rem_start, front_rem_size, behind_rem_start, behind_rem_size)){
S3FS_PRN_WARN("The last untreated area could not be detected and the uploaded area could not be excluded from it, but continue because it does not affect the overall processing."); S3FS_PRN_WARN("The last untreated area could not be detected and the uploaded area could not be excluded from it, but continue because it does not affect the overall processing.");
} }
@ -787,10 +723,10 @@ bool PseudoFdInfo::ExtractUploadPartsFromUntreatedArea(off_t& untreated_start, o
// use_copy : Specify true if copy multipart upload is available. // use_copy : Specify true if copy multipart upload is available.
// //
// [NOTE] // [NOTE]
// The untreated_list does not change, but upload_list is changed. // The untreated_list in fdentity does not change, but upload_list is changed.
// (If you want to restore it, you can use cancel_upload_list.) // (If you want to restore it, you can use cancel_upload_list.)
// //
bool PseudoFdInfo::ExtractUploadPartsFromAllArea(mp_part_list_t& to_upload_list, mp_part_list_t& to_copy_list, mp_part_list_t& to_download_list, filepart_list_t& cancel_upload_list, off_t max_mp_size, off_t file_size, bool use_copy) bool PseudoFdInfo::ExtractUploadPartsFromAllArea(UntreatedParts& untreated_list, mp_part_list_t& to_upload_list, mp_part_list_t& to_copy_list, mp_part_list_t& to_download_list, filepart_list_t& cancel_upload_list, off_t max_mp_size, off_t file_size, bool use_copy)
{ {
AutoLock auto_lock(&upload_list_lock); AutoLock auto_lock(&upload_list_lock);

View File

@ -26,6 +26,8 @@
#include "metaheader.h" #include "metaheader.h"
#include "autolock.h" #include "autolock.h"
class FdEntity;
//------------------------------------------------ //------------------------------------------------
// Structure of parameters to pass to thread // Structure of parameters to pass to thread
//------------------------------------------------ //------------------------------------------------
@ -61,8 +63,7 @@ class PseudoFdInfo
std::string upload_id; std::string upload_id;
int upload_fd; // duplicated fd for uploading int upload_fd; // duplicated fd for uploading
filepart_list_t upload_list; filepart_list_t upload_list;
UntreatedParts untreated_list; // list of untreated parts that have been written and not yet uploaded(for streamupload) petagpool etag_entities; // list of etag string and part number entities(to maintain the etag entity even if MPPART_INFO is destroyed)
etaglist_t etag_entities; // list of etag string and part number entities(to maintain the etag entity even if MPPART_INFO is destroyed)
bool is_lock_init; bool is_lock_init;
pthread_mutex_t upload_list_lock; // protects upload_id and upload_list pthread_mutex_t upload_list_lock; // protects upload_id and upload_list
Semaphore uploaded_sem; // use a semaphore to trigger an upload completion like event flag Semaphore uploaded_sem; // use a semaphore to trigger an upload completion like event flag
@ -76,8 +77,6 @@ class PseudoFdInfo
bool Clear(); bool Clear();
void CloseUploadFd(AutoLock::Type type = AutoLock::NONE); void CloseUploadFd(AutoLock::Type type = AutoLock::NONE);
bool OpenUploadFd(AutoLock::Type type = AutoLock::NONE); bool OpenUploadFd(AutoLock::Type type = AutoLock::NONE);
bool GetLastUpdateUntreatedPart(off_t& start, off_t& size);
bool ReplaceLastUpdateUntreatedPart(off_t front_start, off_t front_size, off_t behind_start, off_t behind_size);
bool ParallelMultipartUpload(const char* path, const mp_part_list_t& mplist, bool is_copy); bool ParallelMultipartUpload(const char* path, const mp_part_list_t& mplist, bool is_copy);
bool InsertUploadPart(off_t start, off_t size, int part_num, bool is_copy, etagpair** ppetag, AutoLock::Type type = AutoLock::NONE); bool InsertUploadPart(off_t start, off_t size, int part_num, bool is_copy, etagpair** ppetag, AutoLock::Type type = AutoLock::NONE);
int WaitAllThreadsExit(); int WaitAllThreadsExit();
@ -103,14 +102,10 @@ class PseudoFdInfo
bool AppendUploadPart(off_t start, off_t size, bool is_copy = false, etagpair** ppetag = NULL); bool AppendUploadPart(off_t start, off_t size, bool is_copy = false, etagpair** ppetag = NULL);
void ClearUntreated(AutoLock::Type type = AutoLock::NONE);
bool ClearUntreated(off_t start, off_t size);
bool GetLastUntreated(off_t& start, off_t& size, off_t max_size, off_t min_size = MIN_MULTIPART_SIZE);
bool AddUntreated(off_t start, off_t size);
bool ParallelMultipartUploadAll(const char* path, const mp_part_list_t& upload_list, const mp_part_list_t& copy_list, int& result); bool ParallelMultipartUploadAll(const char* path, const mp_part_list_t& upload_list, const mp_part_list_t& copy_list, int& result);
ssize_t UploadBoundaryLastUntreatedArea(const char* path, headers_t& meta);
bool ExtractUploadPartsFromAllArea(mp_part_list_t& to_upload_list, mp_part_list_t& to_copy_list, mp_part_list_t& to_download_list, filepart_list_t& cancel_upload_list, off_t max_mp_size, off_t file_size, bool use_copy); ssize_t UploadBoundaryLastUntreatedArea(const char* path, headers_t& meta, FdEntity* pfdent);
bool ExtractUploadPartsFromAllArea(UntreatedParts& untreated_list, mp_part_list_t& to_upload_list, mp_part_list_t& to_copy_list, mp_part_list_t& to_download_list, filepart_list_t& cancel_upload_list, off_t max_mp_size, off_t file_size, bool use_copy);
}; };
typedef std::map<int, class PseudoFdInfo*> fdinfo_map_t; typedef std::map<int, class PseudoFdInfo*> fdinfo_map_t;

View File

@ -198,6 +198,33 @@ struct etagpair
typedef std::list<etagpair> etaglist_t; typedef std::list<etagpair> etaglist_t;
struct petagpool
{
std::list<etagpair*> petaglist;
~petagpool()
{
clear();
}
void clear()
{
for(std::list<etagpair*>::iterator it = petaglist.begin(); petaglist.end() != it; ++it){
if(*it){
delete (*it);
}
}
petaglist.clear();
}
etagpair* add(const etagpair& etag_entity)
{
etagpair* petag = new etagpair(etag_entity);
petaglist.push_back(petag);
return petag;
}
};
// //
// Each part information for Multipart upload // Each part information for Multipart upload
// //