Added info object about multipart uploading for each pseudo fd

(and fixed typo about method name)
This commit is contained in:
Takeshi Nakatani 2021-05-23 16:55:25 +00:00 committed by Andrew Gaul
parent ac578d188e
commit c2c56d0263
12 changed files with 215 additions and 31 deletions

View File

@ -3980,7 +3980,7 @@ int S3fsCurl::MultipartHeadRequest(const char* tpath, off_t size, headers_t& met
return 0;
}
int S3fsCurl::MultipartUploadRequest(const std::string& upload_id, const char* tpath, int fd, off_t offset, off_t size, etaglist_t& list)
int S3fsCurl::MultipartUploadRequest(const std::string& upload_id, const char* tpath, int fd, off_t offset, off_t size, int part_num, std::string* petag)
{
S3FS_PRN_INFO3("[upload_id=%s][tpath=%s][fd=%d][offset=%lld][size=%lld]", upload_id.c_str(), SAFESTRPTR(tpath), fd, static_cast<long long int>(offset), static_cast<long long int>(size));
@ -4000,12 +4000,12 @@ int S3fsCurl::MultipartUploadRequest(const std::string& upload_id, const char* t
partdata.size = size;
b_partdata_startpos = partdata.startpos;
b_partdata_size = partdata.size;
partdata.add_etag_list(&list);
partdata.add_etag(petag);
// upload part
int result;
if(0 != (result = UploadMultipartPostRequest(tpath, list.size(), upload_id))){
S3FS_PRN_ERR("failed uploading part(%d)", result);
if(0 != (result = UploadMultipartPostRequest(tpath, part_num, upload_id))){
S3FS_PRN_ERR("failed uploading %d part by error(%d)", part_num, result);
close(fd2);
return result;
}

View File

@ -393,7 +393,7 @@ class S3fsCurl
int MultipartListRequest(std::string& body);
int AbortMultipartUpload(const char* tpath, const std::string& upload_id);
int MultipartHeadRequest(const char* tpath, off_t size, headers_t& meta, bool is_copy);
int MultipartUploadRequest(const std::string& upload_id, const char* tpath, int fd, off_t offset, off_t size, etaglist_t& list);
int MultipartUploadRequest(const std::string& upload_id, const char* tpath, int fd, off_t offset, off_t size, int part_num, std::string* petag);
int MultipartRenameRequest(const char* from, const char* to, headers_t& meta, off_t size);
// methods(variables)

View File

@ -347,9 +347,11 @@ bool FdManager::HaveLseekHole()
bool FdManager::HasOpenEntityFd(const char* path)
{
AutoLock auto_lock(&FdManager::fd_manager_lock);
FdEntity* ent;
int fd = -1;
if(NULL == (ent = FdManager::singleton.GetFdEntity(path, fd, false))){
if(NULL == (ent = FdManager::singleton.GetFdEntity(path, fd, false, true))){
return false;
}
return (0 < ent->GetOpenCount());
@ -416,14 +418,14 @@ FdManager::~FdManager()
}
}
FdEntity* FdManager::GetFdEntity(const char* path, int& existfd, bool newfd)
FdEntity* FdManager::GetFdEntity(const char* path, int& existfd, bool newfd, bool lock_already_held)
{
S3FS_PRN_INFO3("[path=%s][fd=%d]", SAFESTRPTR(path), existfd);
if(!path || '\0' == path[0]){
return NULL;
}
AutoLock auto_lock(&FdManager::fd_manager_lock);
AutoLock auto_lock(&FdManager::fd_manager_lock, lock_already_held ? AutoLock::ALREADY_LOCKED : AutoLock::NONE);
fdent_map_t::iterator iter = fent.find(std::string(path));
if(fent.end() != iter && iter->second){
@ -476,6 +478,7 @@ FdEntity* FdManager::Open(int& fd, const char* path, headers_t* pmeta, off_t siz
if(!path || '\0' == path[0]){
return NULL;
}
AutoLock auto_lock(&FdManager::fd_manager_lock);
// search in mapping by key(path)
@ -507,7 +510,7 @@ FdEntity* FdManager::Open(int& fd, const char* path, headers_t* pmeta, off_t siz
}
// (re)open
if(-1 == (fd = ent->Open(pmeta, size, time, flags, no_fd_lock_wait ? AutoLock::NO_WAIT : AutoLock::NON))){
if(-1 == (fd = ent->Open(pmeta, size, time, flags, no_fd_lock_wait ? AutoLock::NO_WAIT : AutoLock::NONE))){
S3FS_PRN_ERR("failed to (re)open and create new pseudo fd for path(%s).", path);
return NULL;
}
@ -523,7 +526,7 @@ FdEntity* FdManager::Open(int& fd, const char* path, headers_t* pmeta, off_t siz
ent = new FdEntity(path, cache_path.c_str());
// open
if(-1 == (fd = ent->Open(pmeta, size, time, flags, no_fd_lock_wait ? AutoLock::NO_WAIT : AutoLock::NON))){
if(-1 == (fd = ent->Open(pmeta, size, time, flags, no_fd_lock_wait ? AutoLock::NO_WAIT : AutoLock::NONE))){
delete ent;
return NULL;
}
@ -543,7 +546,6 @@ FdEntity* FdManager::Open(int& fd, const char* path, headers_t* pmeta, off_t siz
FdManager::MakeRandomTempPath(path, tmppath);
fent[tmppath] = ent;
}
}else{
return NULL;
}
@ -554,7 +556,7 @@ FdEntity* FdManager::Open(int& fd, const char* path, headers_t* pmeta, off_t siz
// This method does not create a new pseudo fd.
// It just finds existfd and returns the corresponding entity.
//
FdEntity* FdManager::GetExistFdEntiy(const char* path, int existfd)
FdEntity* FdManager::GetExistFdEntity(const char* path, int existfd)
{
S3FS_PRN_DBG("[path=%s][existfd=%d]", SAFESTRPTR(path), existfd);

View File

@ -76,9 +76,9 @@ class FdManager
static bool HaveLseekHole();
// Return FdEntity associated with path, returning NULL on error. This operation increments the reference count; callers must decrement via Close after use.
FdEntity* GetFdEntity(const char* path, int& existfd, bool newfd = true);
FdEntity* GetFdEntity(const char* path, int& existfd, bool newfd = true, bool lock_already_held = false);
FdEntity* Open(int& fd, const char* path, headers_t* pmeta = NULL, off_t size = -1, time_t time = -1, int flags = O_RDONLY, bool force_tmpfile = false, bool is_create = true, bool no_fd_lock_wait = false);
FdEntity* GetExistFdEntiy(const char* path, int existfd = -1);
FdEntity* GetExistFdEntity(const char* path, int existfd = -1);
FdEntity* OpenExistFdEntiy(const char* path, int& fd, int flags = O_RDONLY);
void Rename(const std::string &from, const std::string &to);
bool Close(FdEntity* ent, int fd);

View File

@ -112,12 +112,12 @@ FdEntity* AutoFdEntity::Open(const char* path, headers_t* pmeta, off_t size, tim
// [NOTE]
// the fd obtained by this method is not a newly created pseudo fd.
//
FdEntity* AutoFdEntity::GetExistFdEntiy(const char* path, int existfd)
FdEntity* AutoFdEntity::GetExistFdEntity(const char* path, int existfd)
{
Close();
FdEntity* ent;
if(NULL == (ent = FdManager::get()->GetExistFdEntiy(path, existfd))){
if(NULL == (ent = FdManager::get()->GetExistFdEntity(path, existfd))){
return NULL;
}
return ent;

View File

@ -51,7 +51,7 @@ class AutoFdEntity
int GetPseudoFd() const { return pseudo_fd; }
FdEntity* Open(const char* path, headers_t* pmeta = NULL, off_t size = -1, time_t time = -1, int flags = O_RDONLY, bool force_tmpfile = false, bool is_create = true, bool no_fd_lock_wait = false);
FdEntity* GetExistFdEntiy(const char* path, int existfd = -1);
FdEntity* GetExistFdEntity(const char* path, int existfd = -1);
FdEntity* OpenExistFdEntiy(const char* path, int flags = O_RDONLY);
};

View File

@ -1594,7 +1594,7 @@ ssize_t FdEntity::Write(int fd, const char* bytes, off_t start, size_t size)
S3FS_PRN_DBG("[path=%s][pseudo_fd=%d][physical_fd=%d][offset=%lld][size=%zu]", path.c_str(), fd, physical_fd, static_cast<long long int>(start), size);
PseudoFdInfo* pseudo_obj = NULL;
if(-1 == physical_fd || NULL == (pseudo_obj = CheckPseudoFdFlags(fd, true))){
if(-1 == physical_fd || NULL == (pseudo_obj = CheckPseudoFdFlags(fd, false))){
S3FS_PRN_DBG("pseudo_fd(%d) to physical_fd(%d) for path(%s) is not opened or not writable", fd, physical_fd, path.c_str());
return -EBADF;
}

View File

@ -46,10 +46,6 @@ class FdEntity
pthread_mutex_t fdent_data_lock;// protects the following members
PageList pagelist;
std::string upload_id; // for no cached multipart uploading when no disk space
etaglist_t etaglist; // for no cached multipart uploading when no disk space
off_t mp_start; // start position for no cached multipart(write method only)
off_t mp_size; // size for no cached multipart(write method only)
std::string cachepath; // local cache file path
// (if this is empty, does not load/save pagelist.)
std::string mirrorpath; // mirror file path to local cache file path
@ -63,10 +59,14 @@ class FdEntity
void Clear();
ino_t GetInode();
int OpenMirrorFile();
int NoCacheLoadAndPost(off_t start = 0, off_t size = 0); // size=0 means loading to end
bool CheckPseudoFdFlags(int fd, bool writable, bool lock_already_held = false);
int NoCacheLoadAndPost(PseudoFdInfo* pseudo_obj, off_t start = 0, off_t size = 0); // size=0 means loading to end
PseudoFdInfo* CheckPseudoFdFlags(int fd, bool writable, bool lock_already_held = false);
bool IsUploading(bool lock_already_held = false);
bool SetAllStatus(bool is_loaded); // [NOTE] not locking
bool SetAllStatusUnloaded() { return SetAllStatus(false); }
int NoCachePreMultipartPost(PseudoFdInfo* pseudo_obj);
int NoCacheMultipartPost(PseudoFdInfo* pseudo_obj, int tgfd, off_t start, off_t size);
int NoCacheCompleteMultipartPost(PseudoFdInfo* pseudo_obj);
int UploadPendingMeta();
public:
@ -109,9 +109,6 @@ class FdEntity
bool SetContentType(const char* path);
int Load(off_t start = 0, off_t size = 0, bool lock_already_held = false, bool is_modified_flag = false); // size=0 means loading to end
int NoCachePreMultipartPost();
int NoCacheMultipartPost(int tgfd, off_t start, off_t size);
int NoCacheCompleteMultipartPost();
off_t BytesModified();
int RowFlush(int fd, const char* tpath, bool force_sync = false);

View File

@ -33,6 +33,18 @@
//------------------------------------------------
PseudoFdInfo::PseudoFdInfo(int fd, int open_flags) : pseudo_fd(-1), physical_fd(fd), flags(0) //, is_lock_init(false)
{
pthread_mutexattr_t attr;
pthread_mutexattr_init(&attr);
#if S3FS_PTHREAD_ERRORCHECK
pthread_mutexattr_settype(&attr, PTHREAD_MUTEX_ERRORCHECK);
#endif
int result;
if(0 != (result = pthread_mutex_init(&upload_list_lock, &attr))){
S3FS_PRN_CRIT("failed to init upload_list_lock: %d", result);
abort();
}
is_lock_init = true;
if(-1 != physical_fd){
pseudo_fd = PseudoFdManager::Get();
flags = open_flags;
@ -41,6 +53,14 @@ PseudoFdInfo::PseudoFdInfo(int fd, int open_flags) : pseudo_fd(-1), physical_fd(
PseudoFdInfo::~PseudoFdInfo()
{
if(is_lock_init){
int result;
if(0 != (result = pthread_mutex_destroy(&upload_list_lock))){
S3FS_PRN_CRIT("failed to destroy upload_list_lock: %d", result);
abort();
}
is_lock_init = false;
}
Clear();
}
@ -88,6 +108,131 @@ bool PseudoFdInfo::Readable() const
return true;
}
bool PseudoFdInfo::ClearUploadInfo(bool is_cancel_mp, bool lock_already_held)
{
AutoLock auto_lock(&upload_list_lock, lock_already_held ? AutoLock::ALREADY_LOCKED : AutoLock::NONE);
if(is_cancel_mp){
// [TODO]
// If we have any uploaded parts, we should delete them here.
// We haven't implemented it yet, but it will be implemented in the future.
// (User can delete them in the utility mode of s3fs.)
//
S3FS_PRN_INFO("Implementation of cancellation process for multipart upload is awaited.");
}
upload_id.erase();
upload_list.clear();
ClearUntreated(true);
return true;
}
bool PseudoFdInfo::InitialUploadInfo(const std::string& id)
{
AutoLock auto_lock(&upload_list_lock);
if(!ClearUploadInfo(true, true)){
return false;
}
upload_id = id;
return true;
}
bool PseudoFdInfo::GetUploadId(std::string& id) const
{
if(IsUploading()){
S3FS_PRN_ERR("Multipart Upload has not started yet.");
return false;
}
id = upload_id;
return true;
}
bool PseudoFdInfo::GetEtaglist(etaglist_t& list)
{
if(IsUploading()){
S3FS_PRN_ERR("Multipart Upload has not started yet.");
return false;
}
AutoLock auto_lock(&upload_list_lock);
list.clear();
for(mppart_list_t::const_iterator iter = upload_list.begin(); iter != upload_list.end(); ++iter){
list.push_back(iter->etag);
}
return !list.empty();
}
// [NOTE]
// This method adds a part for a multipart upload.
// The added new part must be an area that is exactly continuous with the
// immediately preceding part.
// An error will occur if it is discontinuous or if it overlaps with an
// existing area.
//
bool PseudoFdInfo::AppendUploadPart(off_t start, off_t size, bool is_copy, int* ppartnum, std::string** ppetag)
{
if(IsUploading()){
S3FS_PRN_ERR("Multipart Upload has not started yet.");
return false;
}
AutoLock auto_lock(&upload_list_lock);
off_t next_start_pos = 0;
if(!upload_list.empty()){
next_start_pos = upload_list.back().start + upload_list.back().size;
}
if(start != next_start_pos){
S3FS_PRN_ERR("The expected starting position for the next part is %lld, but %lld was specified.", static_cast<long long int>(next_start_pos), static_cast<long long int>(start));
return false;
}
// add new part
MPPART_INFO newpart(start, size, is_copy, NULL);
upload_list.push_back(newpart);
// set part number
if(ppartnum){
*ppartnum = upload_list.size();
}
// set etag pointer
if(ppetag){
*ppetag = &(upload_list.back().etag);
}
return true;
}
void PseudoFdInfo::ClearUntreated(bool lock_already_held)
{
AutoLock auto_lock(&upload_list_lock, lock_already_held ? AutoLock::ALREADY_LOCKED : AutoLock::NONE);
untreated_start = 0;
untreated_size = 0;
}
bool PseudoFdInfo::GetUntreated(off_t& start, off_t& size)
{
AutoLock auto_lock(&upload_list_lock);
start = untreated_start;
size = untreated_size;
return true;
}
bool PseudoFdInfo::SetUntreated(off_t start, off_t size)
{
AutoLock auto_lock(&upload_list_lock);
untreated_start = start;
untreated_size = size;
return true;
}
/*
* Local variables:
* tab-width: 4

View File

@ -21,6 +21,22 @@
#ifndef S3FS_FDCACHE_FDINFO_H_
#define S3FS_FDCACHE_FDINFO_H_
//------------------------------------------------
// Structure
//------------------------------------------------
typedef struct _mppart_info
{
off_t start;
size_t size;
bool is_copy;
std::string etag;
_mppart_info(off_t part_start = -1, off_t part_size = 0, bool is_copy_part = false, const char* petag = NULL) : start(part_start), size(part_size), is_copy(is_copy_part), etag(NULL == petag ? "" : petag) {}
}MPPART_INFO;
typedef std::list<MPPART_INFO> mppart_list_t;
//------------------------------------------------
// Class PseudoFdInfo
//------------------------------------------------
@ -30,6 +46,13 @@ class PseudoFdInfo
int pseudo_fd;
int physical_fd;
int flags; // flags at open
std::string upload_id;
mppart_list_t upload_list;
off_t untreated_start; // untreated start position
off_t untreated_size; // untreated size
bool is_lock_init;
pthread_mutex_t upload_list_lock; // protects upload_id and upload_list
private:
bool Clear();
@ -45,6 +68,18 @@ class PseudoFdInfo
bool Readable() const;
bool Set(int fd, int open_flags);
bool ClearUploadInfo(bool is_clear_part = false, bool lock_already_held = false);
bool InitialUploadInfo(const std::string& id);
bool IsUploading() const { return !upload_id.empty(); }
bool GetUploadId(std::string& id) const;
bool GetEtaglist(etaglist_t& list);
bool AppendUploadPart(off_t start, off_t size, bool is_copy = false, int* ppartnum = NULL, std::string** ppetag = NULL);
void ClearUntreated(bool lock_already_held = false);
bool GetUntreated(off_t& start, off_t& size);
bool SetUntreated(off_t start, off_t size);
};
typedef std::map<int, class PseudoFdInfo*> fdinfo_map_t;

View File

@ -2316,7 +2316,7 @@ static int s3fs_read(const char* _path, char* buf, size_t size, off_t offset, st
AutoFdEntity autoent;
FdEntity* ent;
if(NULL == (ent = autoent.GetExistFdEntiy(path, static_cast<int>(fi->fh)))){
if(NULL == (ent = autoent.GetExistFdEntity(path, static_cast<int>(fi->fh)))){
S3FS_PRN_ERR("could not find opened fd(%s)", path);
return -EIO;
}
@ -2344,7 +2344,7 @@ static int s3fs_write(const char* _path, const char* buf, size_t size, off_t off
AutoFdEntity autoent;
FdEntity* ent;
if(NULL == (ent = autoent.GetExistFdEntiy(path, static_cast<int>(fi->fh)))){
if(NULL == (ent = autoent.GetExistFdEntity(path, static_cast<int>(fi->fh)))){
S3FS_PRN_ERR("could not find opened fd(%s)", path);
return -EIO;
}
@ -2403,7 +2403,7 @@ static int s3fs_flush(const char* _path, struct fuse_file_info* fi)
AutoFdEntity autoent;
FdEntity* ent;
if(NULL != (ent = autoent.GetExistFdEntiy(path, static_cast<int>(fi->fh)))){
if(NULL != (ent = autoent.GetExistFdEntity(path, static_cast<int>(fi->fh)))){
ent->UpdateMtime(true); // clear the flag not to update mtime.
ent->UpdateCtime();
result = ent->Flush(static_cast<int>(fi->fh), false);
@ -2426,7 +2426,7 @@ static int s3fs_fsync(const char* _path, int datasync, struct fuse_file_info* fi
AutoFdEntity autoent;
FdEntity* ent;
if(NULL != (ent = autoent.GetExistFdEntiy(path, static_cast<int>(fi->fh)))){
if(NULL != (ent = autoent.GetExistFdEntity(path, static_cast<int>(fi->fh)))){
if(0 == datasync){
ent->UpdateMtime();
ent->UpdateCtime();

View File

@ -208,6 +208,11 @@ struct filepart
list->push_back(std::string());
petag = &list->back();
}
void add_etag(std::string* petagobj)
{
petag = petagobj;
}
};
//-------------------------------------------------------------------