Added UntreatedParts class instead of untreated upload info members in PseudoFdInfo

This commit is contained in:
Takeshi Nakatani 2021-07-11 12:38:36 +00:00 committed by Andrew Gaul
parent c30acbbf90
commit 945cc2ac54
7 changed files with 460 additions and 39 deletions

View File

@ -49,6 +49,7 @@ s3fs_SOURCES = \
fdcache_auto.cpp \
fdcache_fdinfo.cpp \
fdcache_pseudofd.cpp \
fdcache_untreated.cpp \
addhead.cpp \
sighandlers.cpp \
autolock.cpp \

View File

@ -1426,6 +1426,8 @@ int FdEntity::RowFlushNoMultipart(PseudoFdInfo* pseudo_obj, const char* tpath)
// reset uploaded file size
size_orgmeta = st.st_size;
pseudo_obj->ClearUntreated();
if(0 == result){
pagelist.ClearAllModified();
}
@ -1501,6 +1503,7 @@ int FdEntity::RowFlushMultipart(PseudoFdInfo* pseudo_obj, const char* tpath)
// reset uploaded file size
size_orgmeta = st.st_size;
}
pseudo_obj->ClearUntreated();
}else{
// Already start uploading
@ -1508,14 +1511,12 @@ int FdEntity::RowFlushMultipart(PseudoFdInfo* pseudo_obj, const char* tpath)
// upload rest data
off_t untreated_start = 0;
off_t untreated_size = 0;
pseudo_obj->GetUntreated(untreated_start, untreated_size);
if(0 < untreated_size){
if(pseudo_obj->GetLastUntreated(untreated_start, untreated_size, S3fsCurl::GetMultipartSize()) && 0 < untreated_size){
if(0 != (result = NoCacheMultipartPost(pseudo_obj, physical_fd, untreated_start, untreated_size))){
S3FS_PRN_ERR("failed to multipart post(start=%lld, size=%lld) for file(physical_fd=%d).", static_cast<long long int>(untreated_start), static_cast<long long int>(untreated_size), physical_fd);
return result;
}
pseudo_obj->ClearUntreated();
pseudo_obj->ClearUntreated(untreated_start, untreated_size);
}
// complete multipart uploading.
if(0 != (result = NoCacheCompleteMultipartPost(pseudo_obj))){
@ -1629,6 +1630,7 @@ int FdEntity::RowFlushMixMultipart(PseudoFdInfo* pseudo_obj, const char* tpath)
// reset uploaded file size
size_orgmeta = st.st_size;
}
pseudo_obj->ClearUntreated();
}else{
// Already start uploading
@ -1636,15 +1638,13 @@ int FdEntity::RowFlushMixMultipart(PseudoFdInfo* pseudo_obj, const char* tpath)
// upload rest data
off_t untreated_start = 0;
off_t untreated_size = 0;
pseudo_obj->GetUntreated(untreated_start, untreated_size);
if(0 < untreated_size){
if(pseudo_obj->GetLastUntreated(untreated_start, untreated_size, S3fsCurl::GetMultipartSize()) && 0 < untreated_size){
if(0 != (result = NoCacheMultipartPost(pseudo_obj, physical_fd, untreated_start, untreated_size))){
S3FS_PRN_ERR("failed to multipart post(start=%lld, size=%lld) for file(physical_fd=%d).", static_cast<long long int>(untreated_start), static_cast<long long int>(untreated_size), physical_fd);
return result;
}
pseudo_obj->ClearUntreated();
}
pseudo_obj->ClearUntreated(untreated_start, untreated_size);
}
// complete multipart uploading.
if(0 != (result = NoCacheCompleteMultipartPost(pseudo_obj))){
S3FS_PRN_ERR("failed to complete(finish) multipart post for file(physical_fd=%d).", physical_fd);
@ -1845,6 +1845,7 @@ ssize_t FdEntity::WriteNoMultipart(PseudoFdInfo* pseudo_obj, const char* bytes,
}
if(0 < wsize){
pagelist.SetPageLoadedStatus(start, wsize, PageList::PAGE_LOAD_MODIFIED);
pseudo_obj->AddUntreated(start, wsize);
}
// Load uninitialized area which starts from (start + size) to EOF after writing.
@ -1904,7 +1905,8 @@ ssize_t FdEntity::WriteMultipart(PseudoFdInfo* pseudo_obj, const char* bytes, of
S3FS_PRN_ERR("failed to load uninitialized area and multipart uploading it(errno=%d)", result);
return result;
}
pseudo_obj->SetUntreated(start, 0);
pseudo_obj->ClearUntreated();
}
}else{
// already start multipart uploading
@ -1918,6 +1920,7 @@ ssize_t FdEntity::WriteMultipart(PseudoFdInfo* pseudo_obj, const char* bytes, of
}
if(0 < wsize){
pagelist.SetPageLoadedStatus(start, wsize, PageList::PAGE_LOAD_MODIFIED);
pseudo_obj->AddUntreated(start, wsize);
}
// Load uninitialized area which starts from (start + size) to EOF after writing.
@ -1931,13 +1934,11 @@ ssize_t FdEntity::WriteMultipart(PseudoFdInfo* pseudo_obj, const char* bytes, of
// check multipart uploading
if(pseudo_obj->IsUploading()){
// get last untreated part(maximum size is multipart size)
off_t untreated_start = 0;
off_t untreated_size = 0;
pseudo_obj->GetUntreated(untreated_start, untreated_size);
untreated_size += wsize;
if(S3fsCurl::GetMultipartSize() <= untreated_size){
// over one multipart size
if(pseudo_obj->GetLastUntreated(untreated_start, untreated_size, S3fsCurl::GetMultipartSize())){
// when multipart max size is reached
if(0 != (result = NoCacheMultipartPost(pseudo_obj, physical_fd, untreated_start, untreated_size))){
S3FS_PRN_ERR("failed to multipart post(start=%lld, size=%lld) for file(physical_fd=%d).", static_cast<long long int>(untreated_start), static_cast<long long int>(untreated_size), physical_fd);
return result;
@ -1951,7 +1952,7 @@ ssize_t FdEntity::WriteMultipart(PseudoFdInfo* pseudo_obj, const char* bytes, of
S3FS_PRN_ERR("failed to truncate file(physical_fd=%d).", physical_fd);
return -errno;
}
pseudo_obj->SetUntreated(untreated_start + untreated_size, 0);
pseudo_obj->ClearUntreated(untreated_start, untreated_size);
}
}
return wsize;
@ -1992,7 +1993,8 @@ ssize_t FdEntity::WriteMixMultipart(PseudoFdInfo* pseudo_obj, const char* bytes,
S3FS_PRN_ERR("failed to load uninitialized area and multipart uploading it(errno=%d)", result);
return result;
}
pseudo_obj->SetUntreated(start, 0);
pseudo_obj->ClearUntreated();
}
}else{
// already start multipart uploading
@ -2006,17 +2008,16 @@ ssize_t FdEntity::WriteMixMultipart(PseudoFdInfo* pseudo_obj, const char* bytes,
}
if(0 < wsize){
pagelist.SetPageLoadedStatus(start, wsize, PageList::PAGE_LOAD_MODIFIED);
pseudo_obj->AddUntreated(start, wsize);
}
// check multipart uploading
if(pseudo_obj->IsUploading()){
// get last untreated part(maximum size is multipart size)
off_t untreated_start = 0;
off_t untreated_size = 0;
pseudo_obj->GetUntreated(untreated_start, untreated_size);
untreated_size += wsize;
if(S3fsCurl::GetMultipartSize() <= untreated_size){
// over one multipart size
if(pseudo_obj->GetLastUntreated(untreated_start, untreated_size, S3fsCurl::GetMultipartSize())){
// when multipart max size is reached
if(0 != (result = NoCacheMultipartPost(pseudo_obj, physical_fd, untreated_start, untreated_size))){
S3FS_PRN_ERR("failed to multipart post(start=%lld, size=%lld) for file(physical_fd=%d).", static_cast<long long int>(untreated_start), static_cast<long long int>(untreated_size), physical_fd);
return result;
@ -2030,7 +2031,7 @@ ssize_t FdEntity::WriteMixMultipart(PseudoFdInfo* pseudo_obj, const char* bytes,
S3FS_PRN_ERR("failed to truncate file(physical_fd=%d).", physical_fd);
return -errno;
}
pseudo_obj->SetUntreated(untreated_start + untreated_size, 0);
pseudo_obj->ClearUntreated(untreated_start, untreated_size);
}
}
return wsize;

View File

@ -216,28 +216,35 @@ void PseudoFdInfo::ClearUntreated(bool lock_already_held)
{
AutoLock auto_lock(&upload_list_lock, lock_already_held ? AutoLock::ALREADY_LOCKED : AutoLock::NONE);
untreated_start = 0;
untreated_size = 0;
untreated_list.ClearAll();
}
bool PseudoFdInfo::GetUntreated(off_t& start, off_t& size)
bool PseudoFdInfo::ClearUntreated(off_t start, off_t size)
{
AutoLock auto_lock(&upload_list_lock);
start = untreated_start;
size = untreated_size;
return true;
return untreated_list.ClearParts(start, size);
}
bool PseudoFdInfo::SetUntreated(off_t start, off_t size)
bool PseudoFdInfo::GetUntreated(off_t& start, off_t& size, off_t max_size, off_t min_size)
{
AutoLock auto_lock(&upload_list_lock);
untreated_start = start;
untreated_size = size;
return untreated_list.GetPart(start, size, max_size, min_size);
}
return true;
bool PseudoFdInfo::GetLastUntreated(off_t& start, off_t& size, off_t max_size, off_t min_size)
{
AutoLock auto_lock(&upload_list_lock);
return untreated_list.GetLastUpdatedPart(start, size, max_size, min_size);
}
bool PseudoFdInfo::AddUntreated(off_t start, off_t size)
{
AutoLock auto_lock(&upload_list_lock);
return untreated_list.AddPart(start, size);
}
/*

View File

@ -21,6 +21,8 @@
#ifndef S3FS_FDCACHE_FDINFO_H_
#define S3FS_FDCACHE_FDINFO_H_
#include "fdcache_untreated.h"
//------------------------------------------------
// Class PseudoFdInfo
//------------------------------------------------
@ -32,9 +34,9 @@ class PseudoFdInfo
int flags; // flags at open
std::string upload_id;
filepart_list_t upload_list;
off_t untreated_start; // untreated start position
off_t untreated_size; // untreated size
UntreatedParts untreated_list; // list of untreated parts that have been written and not yet uploaded(for streamupload)
etaglist_t etag_entities; // list of etag string entities(to maintain the etag entity even if MPPART_INFO is destroyed)
bool is_lock_init;
pthread_mutex_t upload_list_lock; // protects upload_id and upload_list
@ -62,8 +64,10 @@ class PseudoFdInfo
bool AppendUploadPart(off_t start, off_t size, bool is_copy = false, int* ppartnum = NULL, std::string** ppetag = NULL);
void ClearUntreated(bool lock_already_held = false);
bool GetUntreated(off_t& start, off_t& size);
bool SetUntreated(off_t start, off_t size);
bool ClearUntreated(off_t start, off_t size);
bool GetUntreated(off_t& start, off_t& size, off_t max_size, off_t min_size = MIN_MULTIPART_SIZE);
bool GetLastUntreated(off_t& start, off_t& size, off_t max_size, off_t min_size = MIN_MULTIPART_SIZE);
bool AddUntreated(off_t start, off_t size);
};
typedef std::map<int, class PseudoFdInfo*> fdinfo_map_t;

284
src/fdcache_untreated.cpp Normal file
View File

@ -0,0 +1,284 @@
/*
* s3fs - FUSE-based file system backed by Amazon S3
*
* Copyright(C) 2007 Takeshi Nakatani <ggtakec.com>
*
* This program is free software; you can redistribute it and/or
* modify it under the terms of the GNU General Public License
* as published by the Free Software Foundation; either version 2
* of the License, or (at your option) any later version.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU General Public License for more details.
*
* You should have received a copy of the GNU General Public License
* along with this program; if not, write to the Free Software
* Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA.
*/
#include <cstdio>
#include <cstdlib>
#include <algorithm>
#include "common.h"
#include "s3fs.h"
#include "fdcache_untreated.h"
#include "autolock.h"
//------------------------------------------------
// UntreatedParts methods
//------------------------------------------------
UntreatedParts::UntreatedParts() : last_tag(0) //, is_lock_init(false)
{
pthread_mutexattr_t attr;
pthread_mutexattr_init(&attr);
#if S3FS_PTHREAD_ERRORCHECK
pthread_mutexattr_settype(&attr, PTHREAD_MUTEX_ERRORCHECK);
#endif
int result;
if(0 != (result = pthread_mutex_init(&untreated_list_lock, &attr))){
S3FS_PRN_CRIT("failed to init untreated_list_lock: %d", result);
abort();
}
is_lock_init = true;
}
UntreatedParts::~UntreatedParts()
{
if(is_lock_init){
int result;
if(0 != (result = pthread_mutex_destroy(&untreated_list_lock))){
S3FS_PRN_CRIT("failed to destroy untreated_list_lock: %d", result);
abort();
}
is_lock_init = false;
}
}
bool UntreatedParts::empty()
{
AutoLock auto_lock(&untreated_list_lock);
return untreated_list.empty();
}
bool UntreatedParts::AddPart(off_t start, off_t size)
{
if(start < 0 || size <= 0){
S3FS_PRN_ERR("Paramter are wrong(start=%lld, size=%lld).", static_cast<long long int>(start), static_cast<long long int>(size));
return false;
}
AutoLock auto_lock(&untreated_list_lock);
++last_tag;
// Check the overlap with the existing part and add the part.
for(untreated_list_t::iterator iter = untreated_list.begin(); iter != untreated_list.end(); ++iter){
if(iter->stretch(start, size, last_tag)){
// the part was stretched, thus check if it overlaps with next parts
untreated_list_t::iterator niter = iter;
for(++niter; niter != untreated_list.end(); ){
if(!iter->stretch(niter->start, niter->size, last_tag)){
// This next part does not overlap with the current part
break;
}
// Since the parts overlap and the current part is stretched, delete this next part.
niter = untreated_list.erase(niter);
}
// success to stretch and compress existed parts
return true;
}else if((start + size) < iter->start){
// The part to add should be inserted before the current part.
untreated_list.insert(iter, untreatedpart(start, size, last_tag));
return true;
}
}
// There are no overlapping parts in the untreated_list, then add the part at end of list
untreated_list.push_back(untreatedpart(start, size, last_tag));
return true;
}
bool UntreatedParts::RowGetPart(off_t& start, off_t& size, off_t max_size, off_t min_size, bool lastpart)
{
if(max_size <= 0 || min_size < 0 || max_size < min_size){
S3FS_PRN_ERR("Paramter are wrong(max_size=%lld, min_size=%lld).", static_cast<long long int>(max_size), static_cast<long long int>(min_size));
return false;
}
AutoLock auto_lock(&untreated_list_lock);
// Check the overlap with the existing part and add the part.
for(untreated_list_t::iterator iter = untreated_list.begin(); iter != untreated_list.end(); ++iter){
if(!lastpart || iter->untreated_tag == last_tag){
if(min_size <= iter->size){
if(iter->size <= max_size){
// whole part( min <= part size <= max )
start = iter->start;
size = iter->size;
}else{
// Partially take out part( max < part size )
start = iter->start;
size = max_size;
}
return true;
}else{
if(lastpart){
return false;
}
}
}
}
return false;
}
// [NOTE]
// The part with the last tag cannot be taken out if it has not reached max_size.
//
bool UntreatedParts::TakeoutPart(off_t& start, off_t& size, off_t max_size, off_t min_size)
{
if(max_size <= 0 || min_size < 0 || max_size < min_size){
S3FS_PRN_ERR("Paramter are wrong(max_size=%lld, min_size=%lld).", static_cast<long long int>(max_size), static_cast<long long int>(min_size));
return false;
}
AutoLock auto_lock(&untreated_list_lock);
// Check the overlap with the existing part and add the part.
for(untreated_list_t::iterator iter = untreated_list.begin(); iter != untreated_list.end(); ++iter){
if(iter->untreated_tag == last_tag){
// Last updated part
if(max_size <= iter->size){
// Take out only when the maximum part size is exceeded
start = iter->start;
size = max_size;
iter->start = iter->start + max_size;
iter->size = iter->size - max_size;
if(iter->size == 0){
untreated_list.erase(iter);
}
return true;
}
}else{
// Parts updated in the past
if(min_size <= iter->size){
if(iter->size <= max_size){
// Take out the whole part( min <= part size <= max )
start = iter->start;
size = iter->size;
untreated_list.erase(iter);
}else{
// Partially take out part( max < part size )
start = iter->start;
size = max_size;
iter->start = iter->start + max_size;
iter->size = iter->size - max_size;
}
return true;
}
}
}
return false;
}
// [NOTE]
// This method returns the part from the beginning, ignoring conditions
// such as whether it is being updated(the last updated part) or less
// than the minimum size.
//
bool UntreatedParts::TakeoutPartFromBegin(off_t& start, off_t& size, off_t max_size)
{
if(max_size <= 0){
S3FS_PRN_ERR("Paramter is wrong(max_size=%lld).", static_cast<long long int>(max_size));
return false;
}
AutoLock auto_lock(&untreated_list_lock);
if(untreated_list.empty()){
return false;
}
untreated_list_t::iterator iter = untreated_list.begin();
if(iter->size <= max_size){
// Take out the whole part( part size <= max )
start = iter->start;
size = iter->size;
untreated_list.erase(iter);
}else{
// Take out only when the maximum part size is exceeded
start = iter->start;
size = max_size;
iter->start = iter->start + max_size;
iter->size = iter->size - max_size;
}
return true;
}
// [NOTE]
// If size is specified as 0, all areas(parts) after start will be deleted.
//
bool UntreatedParts::ClearParts(off_t start, off_t size)
{
if(start < 0 || size < 0){
S3FS_PRN_ERR("Paramter are wrong(start=%lld, size=%lld).", static_cast<long long int>(start), static_cast<long long int>(size));
return false;
}
AutoLock auto_lock(&untreated_list_lock);
if(untreated_list.empty()){
return true;
}
// Check the overlap with the existing part.
for(untreated_list_t::iterator iter = untreated_list.begin(); iter != untreated_list.end(); ){
if(0 != size && (start + size) <= iter->start){
// clear area is in front of iter area, no more to do.
break;
}else if(start <= iter->start){
if(0 != size && (start + size) <= (iter->start + iter->size)){
// clear area overlaps with iter area(on the start side)
iter->size = (iter->start + iter->size) - (start + size);
iter->start = start + size;
if(0 == iter->size){
iter = untreated_list.erase(iter);
}
}else{
// clear area overlaps with all of iter area
iter = untreated_list.erase(iter);
}
}else if(start < (iter->start + iter->size)){
// clear area overlaps with iter area(on the end side)
if(0 == size || (iter->start + iter->size) <= (start + size) ){
// start to iter->end is clear
iter->size = start - iter->start;
}else{
// parse current part
iter->size = start - iter->start;
// add new part
off_t next_start = start + size;
off_t next_size = (iter->start + iter->size) - (start + size);
long next_tag = iter->untreated_tag;
++iter;
iter = untreated_list.insert(iter, untreatedpart(next_start, next_size, next_tag));
++iter;
}
}else{
// clear area is in behind of iter area
++iter;
}
}
return true;
}
/*
* Local variables:
* tab-width: 4
* c-basic-offset: 4
* End:
* vim600: expandtab sw=4 ts=4 fdm=marker
* vim<600: expandtab sw=4 ts=4
*/

70
src/fdcache_untreated.h Normal file
View File

@ -0,0 +1,70 @@
/*
* s3fs - FUSE-based file system backed by Amazon S3
*
* Copyright(C) 2007 Randy Rizun <rrizun@gmail.com>
*
* This program is free software; you can redistribute it and/or
* modify it under the terms of the GNU General Public License
* as published by the Free Software Foundation; either version 2
* of the License, or (at your option) any later version.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU General Public License for more details.
*
* You should have received a copy of the GNU General Public License
* along with this program; if not, write to the Free Software
* Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA.
*/
#ifndef S3FS_FDCACHE_UNTREATED_H_
#define S3FS_FDCACHE_UNTREATED_H_
//------------------------------------------------
// Class UntreatedParts
//------------------------------------------------
class UntreatedParts
{
private:
pthread_mutex_t untreated_list_lock; // protects untreated_list
bool is_lock_init;
untreated_list_t untreated_list;
long last_tag; // [NOTE] Use this to identify the latest updated part.
private:
bool RowGetPart(off_t& start, off_t& size, off_t max_size, off_t min_size, bool lastpart);
public:
UntreatedParts();
~UntreatedParts();
bool empty();
bool AddPart(off_t start, off_t size);
// [NOTE]
// The following method does not return parts smaller than mini_size.
// You can avoid it by setting min_size to 0.
//
bool GetPart(off_t& start, off_t& size, off_t max_size, off_t min_size = MIN_MULTIPART_SIZE) { return RowGetPart(start, size, max_size, min_size, false); }
bool GetLastUpdatedPart(off_t& start, off_t& size, off_t max_size, off_t min_size = MIN_MULTIPART_SIZE) { return RowGetPart(start, size, max_size, min_size, true); }
bool TakeoutPart(off_t& start, off_t& size, off_t max_size, off_t min_size = MIN_MULTIPART_SIZE);
bool TakeoutPartFromBegin(off_t& start, off_t& size, off_t max_size);
bool ClearParts(off_t start, off_t size);
bool ClearAll() { return ClearParts(0, 0); }
};
#endif // S3FS_FDCACHE_UNTREATED_H_
/*
* Local variables:
* tab-width: 4
* c-basic-offset: 4
* End:
* vim600: expandtab sw=4 ts=4 fdm=marker
* vim<600: expandtab sw=4 ts=4
*/

View File

@ -171,7 +171,7 @@ enum signature_type_t {
};
//----------------------------------------------
// etaglist_t / filepart
// etaglist_t / filepart / untreatedpart
//----------------------------------------------
typedef std::list<std::string> etaglist_t;
@ -220,6 +220,60 @@ struct filepart
typedef std::list<filepart> filepart_list_t;
//
// Each part information for Untreated parts
//
struct untreatedpart
{
off_t start; // untreated start position
off_t size; // number of untreated bytes
long untreated_tag; // untreated part tag
untreatedpart(off_t part_start = 0, off_t part_size = 0, long part_untreated_tag = 0) : start(part_start), size(part_size), untreated_tag(part_untreated_tag)
{
if(part_start < 0 || part_size <= 0){
clear(); // wrong parameter, so clear value.
}
}
~untreatedpart()
{
clear();
}
void clear()
{
start = 0;
size = 0;
untreated_tag = 0;
}
bool check_overlap(off_t chk_start, off_t chk_size)
{
if(chk_start < 0 || chk_size <= 0 || (chk_start + chk_size) < start || (start + size) < chk_start){
return false;
}
return true;
}
bool stretch(off_t add_start, off_t add_size, long tag)
{
if(!check_overlap(add_start, add_size)){
return false;
}
off_t new_start = std::min(start, add_start);
off_t new_next_start = std::max((start + size), (add_start + add_size));
start = new_start;
size = new_next_start - new_start;
untreated_tag = tag;
return true;
}
};
typedef std::list<untreatedpart> untreated_list_t;
//-------------------------------------------------------------------
// mimes_t
//-------------------------------------------------------------------