Update s3fs.cpp

Make s3fs_flush asynchronous
This commit is contained in:
HunterChen 2023-02-21 17:12:23 +08:00
parent f8a825e9b9
commit 5feb1b544d

View File

@ -25,6 +25,8 @@
#include <dirent.h>
#include <sys/types.h>
#include <getopt.h>
#include <thread>
#include <mutex>
#include "common.h"
#include "s3fs.h"
@ -65,6 +67,12 @@ enum dirtype {
DIRTYPE_NOOBJ = 3,
};
typedef struct
{
std::string path;
struct fuse_file_info* fi;
}dto;
//-------------------------------------------------------------------
// Static variables
//-------------------------------------------------------------------
@ -100,7 +108,7 @@ static bool use_wtf8 = false;
static off_t fake_diskfree_size = -1; // default is not set(-1)
static int max_thread_count = 5; // default is 5
static bool update_parent_dir_stat= false; // default not updating parent directory stats
static std::mutex dto_lock;
//-------------------------------------------------------------------
// Global functions : prototype
//-------------------------------------------------------------------
@ -144,7 +152,9 @@ static int s3fs_check_service();
static bool set_mountpoint_attribute(struct stat& mpst);
static int set_bucket(const char* arg);
static int my_fuse_opt_proc(void* data, const char* arg, int key, struct fuse_args* outargs);
static void doRelease(const char* _path, struct fuse_file_info* fi);
static void flush(dto dto);
static void doFlush(dto dto);
//-------------------------------------------------------------------
// fuse interface functions
//-------------------------------------------------------------------
@ -171,7 +181,6 @@ static int s3fs_write(const char* path, const char* buf, size_t size, off_t offs
static int s3fs_statfs(const char* path, struct statvfs* stbuf);
static int s3fs_flush(const char* path, struct fuse_file_info* fi);
static int s3fs_fsync(const char* path, int datasync, struct fuse_file_info* fi);
static int s3fs_release(const char* path, struct fuse_file_info* fi);
static int s3fs_opendir(const char* path, struct fuse_file_info* fi);
static int s3fs_readdir(const char* path, void* buf, fuse_fill_dir_t filler, off_t offset, struct fuse_file_info* fi);
static int s3fs_access(const char* path, int mask);
@ -2808,8 +2817,31 @@ static int s3fs_statfs(const char* _path, struct statvfs* stbuf)
return 0;
}
static void doFlush(dto dto){
const char* path = dto.path.c_str();
struct fuse_file_info* fi=dto.fi;
AutoFdEntity autoent;
FdEntity* ent;
if(NULL != (ent = autoent.GetExistFdEntity(path, static_cast<int>(fi->fh)))){
ent->UpdateMtime(true); // clear the flag not to update mtime.
ent->UpdateCtime();
ent->Flush(static_cast<int>(fi->fh), AutoLock::NONE, false);
StatCache::getStatCacheData()->DelStat(path);
}
}
static void flush(dto dto)
{
struct fuse_file_info* fi=dto.fi;
std::thread thread (doFlush,dto);
thread.join();
doRelease(dto.path.c_str(),dto.fi);
free(fi);
}
static int s3fs_flush(const char* _path, struct fuse_file_info* fi)
{
dto_lock.lock();
WTF8_ENCODE(path)
int result;
@ -2828,24 +2860,23 @@ static int s3fs_flush(const char* _path, struct fuse_file_info* fi)
return result;
}
AutoFdEntity autoent;
FdEntity* ent;
if(NULL != (ent = autoent.GetExistFdEntity(path, static_cast<int>(fi->fh)))){
bool is_new_file = ent->IsDirtyNewFile();
ent->UpdateMtime(true); // clear the flag not to update mtime.
ent->UpdateCtime();
result = ent->Flush(static_cast<int>(fi->fh), AutoLock::NONE, false);
StatCache::getStatCacheData()->DelStat(path);
if(is_new_file){
// update parent directory timestamp
int update_result;
if(0 != (update_result = update_mctime_parent_directory(path))){
S3FS_PRN_ERR("succeed to create the file(%s), but could not update timestamp of its parent directory(result=%d).", path, update_result);
}
}
}
dto dto;
dto.path = path;
struct fuse_file_info* newFi=(fuse_file_info*)(malloc(sizeof(fuse_file_info)));
newFi->flags=fi->flags;
newFi->fh_old=fi->fh_old;
newFi->writepage=fi->writepage;
newFi->direct_io=fi->direct_io;
newFi->keep_cache=fi->keep_cache;
newFi->flush=fi->flush;
newFi->nonseekable=fi->nonseekable;
newFi->flock_release=fi->flock_release;
newFi->padding=fi->padding;
newFi->fh=fi->fh;
newFi->lock_owner=fi->lock_owner;
dto.fi=newFi;
std::thread (flush,dto).detach();
dto_lock.unlock();
S3FS_MALLOCTRIM(0);
return result;
@ -2888,7 +2919,7 @@ static int s3fs_fsync(const char* _path, int datasync, struct fuse_file_info* fi
return result;
}
static int s3fs_release(const char* _path, struct fuse_file_info* fi)
static void doRelease(const char* _path, struct fuse_file_info* fi)
{
WTF8_ENCODE(path)
S3FS_PRN_INFO("[path=%s][pseudo_fd=%llu]", path, (unsigned long long)(fi->fh));
@ -2918,7 +2949,7 @@ static int s3fs_release(const char* _path, struct fuse_file_info* fi)
FdEntity* ent;
if(NULL == (ent = autoent.Attach(path, static_cast<int>(fi->fh)))){
S3FS_PRN_ERR("could not find pseudo_fd(%llu) for path(%s)", (unsigned long long)(fi->fh), path);
return -EIO;
return;
}
bool is_new_file = ent->IsDirtyNewFile();
@ -2927,7 +2958,7 @@ static int s3fs_release(const char* _path, struct fuse_file_info* fi)
int result = ent->UploadPending(static_cast<int>(fi->fh), AutoLock::NONE);
if(0 != result){
S3FS_PRN_ERR("could not upload pending data(meta, etc) for pseudo_fd(%llu) / path(%s)", (unsigned long long)(fi->fh), path);
return result;
return;
}
if(is_new_file){
@ -2947,7 +2978,6 @@ static int s3fs_release(const char* _path, struct fuse_file_info* fi)
}
S3FS_MALLOCTRIM(0);
return 0;
}
static int s3fs_opendir(const char* _path, struct fuse_file_info* fi)
@ -5522,7 +5552,6 @@ int main(int argc, char* argv[])
s3fs_oper.statfs = s3fs_statfs;
s3fs_oper.flush = s3fs_flush;
s3fs_oper.fsync = s3fs_fsync;
s3fs_oper.release = s3fs_release;
s3fs_oper.opendir = s3fs_opendir;
s3fs_oper.readdir = s3fs_readdir;
s3fs_oper.init = s3fs_init;