diff --git a/src/Makefile.am b/src/Makefile.am index 408fc18..985b5b7 100644 --- a/src/Makefile.am +++ b/src/Makefile.am @@ -49,6 +49,7 @@ s3fs_SOURCES = \ fdcache_auto.cpp \ fdcache_fdinfo.cpp \ fdcache_pseudofd.cpp \ + fdcache_untreated.cpp \ addhead.cpp \ sighandlers.cpp \ autolock.cpp \ diff --git a/src/fdcache_entity.cpp b/src/fdcache_entity.cpp index e6fecb4..6f2e243 100644 --- a/src/fdcache_entity.cpp +++ b/src/fdcache_entity.cpp @@ -1426,6 +1426,8 @@ int FdEntity::RowFlushNoMultipart(PseudoFdInfo* pseudo_obj, const char* tpath) // reset uploaded file size size_orgmeta = st.st_size; + pseudo_obj->ClearUntreated(); + if(0 == result){ pagelist.ClearAllModified(); } @@ -1501,6 +1503,7 @@ int FdEntity::RowFlushMultipart(PseudoFdInfo* pseudo_obj, const char* tpath) // reset uploaded file size size_orgmeta = st.st_size; } + pseudo_obj->ClearUntreated(); }else{ // Already start uploading @@ -1508,14 +1511,12 @@ int FdEntity::RowFlushMultipart(PseudoFdInfo* pseudo_obj, const char* tpath) // upload rest data off_t untreated_start = 0; off_t untreated_size = 0; - pseudo_obj->GetUntreated(untreated_start, untreated_size); - - if(0 < untreated_size){ + if(pseudo_obj->GetLastUntreated(untreated_start, untreated_size, S3fsCurl::GetMultipartSize()) && 0 < untreated_size){ if(0 != (result = NoCacheMultipartPost(pseudo_obj, physical_fd, untreated_start, untreated_size))){ S3FS_PRN_ERR("failed to multipart post(start=%lld, size=%lld) for file(physical_fd=%d).", static_cast(untreated_start), static_cast(untreated_size), physical_fd); return result; } - pseudo_obj->ClearUntreated(); + pseudo_obj->ClearUntreated(untreated_start, untreated_size); } // complete multipart uploading. if(0 != (result = NoCacheCompleteMultipartPost(pseudo_obj))){ @@ -1629,6 +1630,7 @@ int FdEntity::RowFlushMixMultipart(PseudoFdInfo* pseudo_obj, const char* tpath) // reset uploaded file size size_orgmeta = st.st_size; } + pseudo_obj->ClearUntreated(); }else{ // Already start uploading @@ -1636,15 +1638,13 @@ int FdEntity::RowFlushMixMultipart(PseudoFdInfo* pseudo_obj, const char* tpath) // upload rest data off_t untreated_start = 0; off_t untreated_size = 0; - pseudo_obj->GetUntreated(untreated_start, untreated_size); - - if(0 < untreated_size){ + if(pseudo_obj->GetLastUntreated(untreated_start, untreated_size, S3fsCurl::GetMultipartSize()) && 0 < untreated_size){ if(0 != (result = NoCacheMultipartPost(pseudo_obj, physical_fd, untreated_start, untreated_size))){ S3FS_PRN_ERR("failed to multipart post(start=%lld, size=%lld) for file(physical_fd=%d).", static_cast(untreated_start), static_cast(untreated_size), physical_fd); return result; } - pseudo_obj->ClearUntreated(); - } + pseudo_obj->ClearUntreated(untreated_start, untreated_size); + } // complete multipart uploading. if(0 != (result = NoCacheCompleteMultipartPost(pseudo_obj))){ S3FS_PRN_ERR("failed to complete(finish) multipart post for file(physical_fd=%d).", physical_fd); @@ -1845,6 +1845,7 @@ ssize_t FdEntity::WriteNoMultipart(PseudoFdInfo* pseudo_obj, const char* bytes, } if(0 < wsize){ pagelist.SetPageLoadedStatus(start, wsize, PageList::PAGE_LOAD_MODIFIED); + pseudo_obj->AddUntreated(start, wsize); } // Load uninitialized area which starts from (start + size) to EOF after writing. @@ -1904,7 +1905,8 @@ ssize_t FdEntity::WriteMultipart(PseudoFdInfo* pseudo_obj, const char* bytes, of S3FS_PRN_ERR("failed to load uninitialized area and multipart uploading it(errno=%d)", result); return result; } - pseudo_obj->SetUntreated(start, 0); + + pseudo_obj->ClearUntreated(); } }else{ // already start multipart uploading @@ -1918,6 +1920,7 @@ ssize_t FdEntity::WriteMultipart(PseudoFdInfo* pseudo_obj, const char* bytes, of } if(0 < wsize){ pagelist.SetPageLoadedStatus(start, wsize, PageList::PAGE_LOAD_MODIFIED); + pseudo_obj->AddUntreated(start, wsize); } // Load uninitialized area which starts from (start + size) to EOF after writing. @@ -1931,13 +1934,11 @@ ssize_t FdEntity::WriteMultipart(PseudoFdInfo* pseudo_obj, const char* bytes, of // check multipart uploading if(pseudo_obj->IsUploading()){ + // get last untreated part(maximum size is multipart size) off_t untreated_start = 0; off_t untreated_size = 0; - pseudo_obj->GetUntreated(untreated_start, untreated_size); - - untreated_size += wsize; - if(S3fsCurl::GetMultipartSize() <= untreated_size){ - // over one multipart size + if(pseudo_obj->GetLastUntreated(untreated_start, untreated_size, S3fsCurl::GetMultipartSize())){ + // when multipart max size is reached if(0 != (result = NoCacheMultipartPost(pseudo_obj, physical_fd, untreated_start, untreated_size))){ S3FS_PRN_ERR("failed to multipart post(start=%lld, size=%lld) for file(physical_fd=%d).", static_cast(untreated_start), static_cast(untreated_size), physical_fd); return result; @@ -1951,7 +1952,7 @@ ssize_t FdEntity::WriteMultipart(PseudoFdInfo* pseudo_obj, const char* bytes, of S3FS_PRN_ERR("failed to truncate file(physical_fd=%d).", physical_fd); return -errno; } - pseudo_obj->SetUntreated(untreated_start + untreated_size, 0); + pseudo_obj->ClearUntreated(untreated_start, untreated_size); } } return wsize; @@ -1992,7 +1993,8 @@ ssize_t FdEntity::WriteMixMultipart(PseudoFdInfo* pseudo_obj, const char* bytes, S3FS_PRN_ERR("failed to load uninitialized area and multipart uploading it(errno=%d)", result); return result; } - pseudo_obj->SetUntreated(start, 0); + + pseudo_obj->ClearUntreated(); } }else{ // already start multipart uploading @@ -2006,17 +2008,16 @@ ssize_t FdEntity::WriteMixMultipart(PseudoFdInfo* pseudo_obj, const char* bytes, } if(0 < wsize){ pagelist.SetPageLoadedStatus(start, wsize, PageList::PAGE_LOAD_MODIFIED); + pseudo_obj->AddUntreated(start, wsize); } // check multipart uploading if(pseudo_obj->IsUploading()){ + // get last untreated part(maximum size is multipart size) off_t untreated_start = 0; off_t untreated_size = 0; - pseudo_obj->GetUntreated(untreated_start, untreated_size); - - untreated_size += wsize; - if(S3fsCurl::GetMultipartSize() <= untreated_size){ - // over one multipart size + if(pseudo_obj->GetLastUntreated(untreated_start, untreated_size, S3fsCurl::GetMultipartSize())){ + // when multipart max size is reached if(0 != (result = NoCacheMultipartPost(pseudo_obj, physical_fd, untreated_start, untreated_size))){ S3FS_PRN_ERR("failed to multipart post(start=%lld, size=%lld) for file(physical_fd=%d).", static_cast(untreated_start), static_cast(untreated_size), physical_fd); return result; @@ -2030,7 +2031,7 @@ ssize_t FdEntity::WriteMixMultipart(PseudoFdInfo* pseudo_obj, const char* bytes, S3FS_PRN_ERR("failed to truncate file(physical_fd=%d).", physical_fd); return -errno; } - pseudo_obj->SetUntreated(untreated_start + untreated_size, 0); + pseudo_obj->ClearUntreated(untreated_start, untreated_size); } } return wsize; diff --git a/src/fdcache_fdinfo.cpp b/src/fdcache_fdinfo.cpp index c6d4e1e..77b0a05 100644 --- a/src/fdcache_fdinfo.cpp +++ b/src/fdcache_fdinfo.cpp @@ -216,28 +216,35 @@ void PseudoFdInfo::ClearUntreated(bool lock_already_held) { AutoLock auto_lock(&upload_list_lock, lock_already_held ? AutoLock::ALREADY_LOCKED : AutoLock::NONE); - untreated_start = 0; - untreated_size = 0; + untreated_list.ClearAll(); } -bool PseudoFdInfo::GetUntreated(off_t& start, off_t& size) +bool PseudoFdInfo::ClearUntreated(off_t start, off_t size) { AutoLock auto_lock(&upload_list_lock); - start = untreated_start; - size = untreated_size; - - return true; + return untreated_list.ClearParts(start, size); } -bool PseudoFdInfo::SetUntreated(off_t start, off_t size) +bool PseudoFdInfo::GetUntreated(off_t& start, off_t& size, off_t max_size, off_t min_size) { AutoLock auto_lock(&upload_list_lock); - untreated_start = start; - untreated_size = size; + return untreated_list.GetPart(start, size, max_size, min_size); +} - return true; +bool PseudoFdInfo::GetLastUntreated(off_t& start, off_t& size, off_t max_size, off_t min_size) +{ + AutoLock auto_lock(&upload_list_lock); + + return untreated_list.GetLastUpdatedPart(start, size, max_size, min_size); +} + +bool PseudoFdInfo::AddUntreated(off_t start, off_t size) +{ + AutoLock auto_lock(&upload_list_lock); + + return untreated_list.AddPart(start, size); } /* diff --git a/src/fdcache_fdinfo.h b/src/fdcache_fdinfo.h index 17b81a8..7d083ec 100644 --- a/src/fdcache_fdinfo.h +++ b/src/fdcache_fdinfo.h @@ -21,6 +21,8 @@ #ifndef S3FS_FDCACHE_FDINFO_H_ #define S3FS_FDCACHE_FDINFO_H_ +#include "fdcache_untreated.h" + //------------------------------------------------ // Class PseudoFdInfo //------------------------------------------------ @@ -32,9 +34,9 @@ class PseudoFdInfo int flags; // flags at open std::string upload_id; filepart_list_t upload_list; - off_t untreated_start; // untreated start position - off_t untreated_size; // untreated size + UntreatedParts untreated_list; // list of untreated parts that have been written and not yet uploaded(for streamupload) etaglist_t etag_entities; // list of etag string entities(to maintain the etag entity even if MPPART_INFO is destroyed) + bool is_lock_init; pthread_mutex_t upload_list_lock; // protects upload_id and upload_list @@ -62,8 +64,10 @@ class PseudoFdInfo bool AppendUploadPart(off_t start, off_t size, bool is_copy = false, int* ppartnum = NULL, std::string** ppetag = NULL); void ClearUntreated(bool lock_already_held = false); - bool GetUntreated(off_t& start, off_t& size); - bool SetUntreated(off_t start, off_t size); + bool ClearUntreated(off_t start, off_t size); + bool GetUntreated(off_t& start, off_t& size, off_t max_size, off_t min_size = MIN_MULTIPART_SIZE); + bool GetLastUntreated(off_t& start, off_t& size, off_t max_size, off_t min_size = MIN_MULTIPART_SIZE); + bool AddUntreated(off_t start, off_t size); }; typedef std::map fdinfo_map_t; diff --git a/src/fdcache_untreated.cpp b/src/fdcache_untreated.cpp new file mode 100644 index 0000000..78ace1b --- /dev/null +++ b/src/fdcache_untreated.cpp @@ -0,0 +1,284 @@ +/* + * s3fs - FUSE-based file system backed by Amazon S3 + * + * Copyright(C) 2007 Takeshi Nakatani + * + * This program is free software; you can redistribute it and/or + * modify it under the terms of the GNU General Public License + * as published by the Free Software Foundation; either version 2 + * of the License, or (at your option) any later version. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU General Public License for more details. + * + * You should have received a copy of the GNU General Public License + * along with this program; if not, write to the Free Software + * Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA. + */ + +#include +#include +#include + +#include "common.h" +#include "s3fs.h" +#include "fdcache_untreated.h" +#include "autolock.h" + +//------------------------------------------------ +// UntreatedParts methods +//------------------------------------------------ +UntreatedParts::UntreatedParts() : last_tag(0) //, is_lock_init(false) +{ + pthread_mutexattr_t attr; + pthread_mutexattr_init(&attr); +#if S3FS_PTHREAD_ERRORCHECK + pthread_mutexattr_settype(&attr, PTHREAD_MUTEX_ERRORCHECK); +#endif + + int result; + if(0 != (result = pthread_mutex_init(&untreated_list_lock, &attr))){ + S3FS_PRN_CRIT("failed to init untreated_list_lock: %d", result); + abort(); + } + is_lock_init = true; +} + +UntreatedParts::~UntreatedParts() +{ + if(is_lock_init){ + int result; + if(0 != (result = pthread_mutex_destroy(&untreated_list_lock))){ + S3FS_PRN_CRIT("failed to destroy untreated_list_lock: %d", result); + abort(); + } + is_lock_init = false; + } +} + +bool UntreatedParts::empty() +{ + AutoLock auto_lock(&untreated_list_lock); + return untreated_list.empty(); +} + +bool UntreatedParts::AddPart(off_t start, off_t size) +{ + if(start < 0 || size <= 0){ + S3FS_PRN_ERR("Paramter are wrong(start=%lld, size=%lld).", static_cast(start), static_cast(size)); + return false; + } + AutoLock auto_lock(&untreated_list_lock); + + ++last_tag; + + // Check the overlap with the existing part and add the part. + for(untreated_list_t::iterator iter = untreated_list.begin(); iter != untreated_list.end(); ++iter){ + if(iter->stretch(start, size, last_tag)){ + // the part was stretched, thus check if it overlaps with next parts + untreated_list_t::iterator niter = iter; + for(++niter; niter != untreated_list.end(); ){ + if(!iter->stretch(niter->start, niter->size, last_tag)){ + // This next part does not overlap with the current part + break; + } + // Since the parts overlap and the current part is stretched, delete this next part. + niter = untreated_list.erase(niter); + } + // success to stretch and compress existed parts + return true; + + }else if((start + size) < iter->start){ + // The part to add should be inserted before the current part. + untreated_list.insert(iter, untreatedpart(start, size, last_tag)); + return true; + } + } + // There are no overlapping parts in the untreated_list, then add the part at end of list + untreated_list.push_back(untreatedpart(start, size, last_tag)); + return true; +} + +bool UntreatedParts::RowGetPart(off_t& start, off_t& size, off_t max_size, off_t min_size, bool lastpart) +{ + if(max_size <= 0 || min_size < 0 || max_size < min_size){ + S3FS_PRN_ERR("Paramter are wrong(max_size=%lld, min_size=%lld).", static_cast(max_size), static_cast(min_size)); + return false; + } + AutoLock auto_lock(&untreated_list_lock); + + // Check the overlap with the existing part and add the part. + for(untreated_list_t::iterator iter = untreated_list.begin(); iter != untreated_list.end(); ++iter){ + if(!lastpart || iter->untreated_tag == last_tag){ + if(min_size <= iter->size){ + if(iter->size <= max_size){ + // whole part( min <= part size <= max ) + start = iter->start; + size = iter->size; + }else{ + // Partially take out part( max < part size ) + start = iter->start; + size = max_size; + } + return true; + }else{ + if(lastpart){ + return false; + } + } + } + } + return false; +} + +// [NOTE] +// The part with the last tag cannot be taken out if it has not reached max_size. +// +bool UntreatedParts::TakeoutPart(off_t& start, off_t& size, off_t max_size, off_t min_size) +{ + if(max_size <= 0 || min_size < 0 || max_size < min_size){ + S3FS_PRN_ERR("Paramter are wrong(max_size=%lld, min_size=%lld).", static_cast(max_size), static_cast(min_size)); + return false; + } + AutoLock auto_lock(&untreated_list_lock); + + // Check the overlap with the existing part and add the part. + for(untreated_list_t::iterator iter = untreated_list.begin(); iter != untreated_list.end(); ++iter){ + if(iter->untreated_tag == last_tag){ + // Last updated part + if(max_size <= iter->size){ + // Take out only when the maximum part size is exceeded + start = iter->start; + size = max_size; + iter->start = iter->start + max_size; + iter->size = iter->size - max_size; + + if(iter->size == 0){ + untreated_list.erase(iter); + } + return true; + } + }else{ + // Parts updated in the past + if(min_size <= iter->size){ + if(iter->size <= max_size){ + // Take out the whole part( min <= part size <= max ) + start = iter->start; + size = iter->size; + untreated_list.erase(iter); + }else{ + // Partially take out part( max < part size ) + start = iter->start; + size = max_size; + iter->start = iter->start + max_size; + iter->size = iter->size - max_size; + } + return true; + } + } + } + return false; +} + +// [NOTE] +// This method returns the part from the beginning, ignoring conditions +// such as whether it is being updated(the last updated part) or less +// than the minimum size. +// +bool UntreatedParts::TakeoutPartFromBegin(off_t& start, off_t& size, off_t max_size) +{ + if(max_size <= 0){ + S3FS_PRN_ERR("Paramter is wrong(max_size=%lld).", static_cast(max_size)); + return false; + } + AutoLock auto_lock(&untreated_list_lock); + + if(untreated_list.empty()){ + return false; + } + + untreated_list_t::iterator iter = untreated_list.begin(); + if(iter->size <= max_size){ + // Take out the whole part( part size <= max ) + start = iter->start; + size = iter->size; + + untreated_list.erase(iter); + }else{ + // Take out only when the maximum part size is exceeded + start = iter->start; + size = max_size; + + iter->start = iter->start + max_size; + iter->size = iter->size - max_size; + } + return true; +} + +// [NOTE] +// If size is specified as 0, all areas(parts) after start will be deleted. +// +bool UntreatedParts::ClearParts(off_t start, off_t size) +{ + if(start < 0 || size < 0){ + S3FS_PRN_ERR("Paramter are wrong(start=%lld, size=%lld).", static_cast(start), static_cast(size)); + return false; + } + AutoLock auto_lock(&untreated_list_lock); + + if(untreated_list.empty()){ + return true; + } + + // Check the overlap with the existing part. + for(untreated_list_t::iterator iter = untreated_list.begin(); iter != untreated_list.end(); ){ + if(0 != size && (start + size) <= iter->start){ + // clear area is in front of iter area, no more to do. + break; + }else if(start <= iter->start){ + if(0 != size && (start + size) <= (iter->start + iter->size)){ + // clear area overlaps with iter area(on the start side) + iter->size = (iter->start + iter->size) - (start + size); + iter->start = start + size; + if(0 == iter->size){ + iter = untreated_list.erase(iter); + } + }else{ + // clear area overlaps with all of iter area + iter = untreated_list.erase(iter); + } + }else if(start < (iter->start + iter->size)){ + // clear area overlaps with iter area(on the end side) + if(0 == size || (iter->start + iter->size) <= (start + size) ){ + // start to iter->end is clear + iter->size = start - iter->start; + }else{ + // parse current part + iter->size = start - iter->start; + + // add new part + off_t next_start = start + size; + off_t next_size = (iter->start + iter->size) - (start + size); + long next_tag = iter->untreated_tag; + ++iter; + iter = untreated_list.insert(iter, untreatedpart(next_start, next_size, next_tag)); + ++iter; + } + }else{ + // clear area is in behind of iter area + ++iter; + } + } + return true; +} + +/* +* Local variables: +* tab-width: 4 +* c-basic-offset: 4 +* End: +* vim600: expandtab sw=4 ts=4 fdm=marker +* vim<600: expandtab sw=4 ts=4 +*/ diff --git a/src/fdcache_untreated.h b/src/fdcache_untreated.h new file mode 100644 index 0000000..349541e --- /dev/null +++ b/src/fdcache_untreated.h @@ -0,0 +1,70 @@ +/* + * s3fs - FUSE-based file system backed by Amazon S3 + * + * Copyright(C) 2007 Randy Rizun + * + * This program is free software; you can redistribute it and/or + * modify it under the terms of the GNU General Public License + * as published by the Free Software Foundation; either version 2 + * of the License, or (at your option) any later version. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU General Public License for more details. + * + * You should have received a copy of the GNU General Public License + * along with this program; if not, write to the Free Software + * Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA. + */ + +#ifndef S3FS_FDCACHE_UNTREATED_H_ +#define S3FS_FDCACHE_UNTREATED_H_ + +//------------------------------------------------ +// Class UntreatedParts +//------------------------------------------------ +class UntreatedParts +{ + private: + pthread_mutex_t untreated_list_lock; // protects untreated_list + bool is_lock_init; + + untreated_list_t untreated_list; + long last_tag; // [NOTE] Use this to identify the latest updated part. + + private: + bool RowGetPart(off_t& start, off_t& size, off_t max_size, off_t min_size, bool lastpart); + + public: + UntreatedParts(); + ~UntreatedParts(); + + bool empty(); + + bool AddPart(off_t start, off_t size); + + // [NOTE] + // The following method does not return parts smaller than mini_size. + // You can avoid it by setting min_size to 0. + // + bool GetPart(off_t& start, off_t& size, off_t max_size, off_t min_size = MIN_MULTIPART_SIZE) { return RowGetPart(start, size, max_size, min_size, false); } + bool GetLastUpdatedPart(off_t& start, off_t& size, off_t max_size, off_t min_size = MIN_MULTIPART_SIZE) { return RowGetPart(start, size, max_size, min_size, true); } + + bool TakeoutPart(off_t& start, off_t& size, off_t max_size, off_t min_size = MIN_MULTIPART_SIZE); + bool TakeoutPartFromBegin(off_t& start, off_t& size, off_t max_size); + + bool ClearParts(off_t start, off_t size); + bool ClearAll() { return ClearParts(0, 0); } +}; + +#endif // S3FS_FDCACHE_UNTREATED_H_ + +/* +* Local variables: +* tab-width: 4 +* c-basic-offset: 4 +* End: +* vim600: expandtab sw=4 ts=4 fdm=marker +* vim<600: expandtab sw=4 ts=4 +*/ diff --git a/src/types.h b/src/types.h index 6830ef1..8005ac6 100644 --- a/src/types.h +++ b/src/types.h @@ -171,7 +171,7 @@ enum signature_type_t { }; //---------------------------------------------- -// etaglist_t / filepart +// etaglist_t / filepart / untreatedpart //---------------------------------------------- typedef std::list etaglist_t; @@ -220,6 +220,60 @@ struct filepart typedef std::list filepart_list_t; +// +// Each part information for Untreated parts +// +struct untreatedpart +{ + off_t start; // untreated start position + off_t size; // number of untreated bytes + long untreated_tag; // untreated part tag + + untreatedpart(off_t part_start = 0, off_t part_size = 0, long part_untreated_tag = 0) : start(part_start), size(part_size), untreated_tag(part_untreated_tag) + { + if(part_start < 0 || part_size <= 0){ + clear(); // wrong parameter, so clear value. + } + } + + ~untreatedpart() + { + clear(); + } + + void clear() + { + start = 0; + size = 0; + untreated_tag = 0; + } + + bool check_overlap(off_t chk_start, off_t chk_size) + { + if(chk_start < 0 || chk_size <= 0 || (chk_start + chk_size) < start || (start + size) < chk_start){ + return false; + } + return true; + } + + bool stretch(off_t add_start, off_t add_size, long tag) + { + if(!check_overlap(add_start, add_size)){ + return false; + } + off_t new_start = std::min(start, add_start); + off_t new_next_start = std::max((start + size), (add_start + add_size)); + + start = new_start; + size = new_next_start - new_start; + untreated_tag = tag; + + return true; + } +}; + +typedef std::list untreated_list_t; + //------------------------------------------------------------------- // mimes_t //-------------------------------------------------------------------