mirror of
https://github.com/s3fs-fuse/s3fs-fuse.git
synced 2025-01-08 15:44:11 +00:00
Splited some methods of FdEntity class by uploading mode
- Splited FdEntity::Write method by uploading mode - Splited FdEntity::RowFlush method by uplading mode
This commit is contained in:
parent
881025cc9e
commit
c30acbbf90
@ -1312,6 +1312,20 @@ off_t FdEntity::BytesModified()
|
||||
return pagelist.BytesModified();
|
||||
}
|
||||
|
||||
// [NOTE]
|
||||
// There are conditions that allow you to perform multipart uploads.
|
||||
//
|
||||
// According to the AWS spec:
|
||||
// - 1 to 10,000 parts are allowed
|
||||
// - minimum size of parts is 5MB (except for the last part)
|
||||
//
|
||||
// For example, if you set the minimum part size to 5MB, you can upload
|
||||
// a maximum (5 * 10,000)MB file.
|
||||
// The part size can be changed in MB units, then the maximum file size
|
||||
// that can be handled can be further increased.
|
||||
// Files smaller than the minimum part size will not be multipart uploaded,
|
||||
// but will be uploaded as single part(normally).
|
||||
//
|
||||
int FdEntity::RowFlush(int fd, const char* tpath, bool force_sync)
|
||||
{
|
||||
S3FS_PRN_INFO3("[tpath=%s][path=%s][pseudo_fd=%d][physical_fd=%d]", SAFESTRPTR(tpath), path.c_str(), fd, physical_fd);
|
||||
@ -1319,11 +1333,8 @@ int FdEntity::RowFlush(int fd, const char* tpath, bool force_sync)
|
||||
if(-1 == physical_fd){
|
||||
return -EBADF;
|
||||
}
|
||||
int result = 0;
|
||||
std::string tmppath = path;
|
||||
|
||||
AutoLock auto_lock(&fdent_lock);
|
||||
headers_t tmporgmeta = orgmeta;
|
||||
|
||||
// check pseudo fd and its flag
|
||||
fdinfo_map_t::iterator miter = pseudo_fd_map.find(fd);
|
||||
@ -1343,80 +1354,243 @@ int FdEntity::RowFlush(int fd, const char* tpath, bool force_sync)
|
||||
return 0;
|
||||
}
|
||||
|
||||
int result;
|
||||
if(nomultipart){
|
||||
// No multipart upload
|
||||
result = RowFlushNoMultipart(pseudo_obj, tpath);
|
||||
}else if(FdEntity::mixmultipart){
|
||||
// Mix multipart upload
|
||||
result = RowFlushMixMultipart(pseudo_obj, tpath);
|
||||
}else{
|
||||
// Normal multipart upload
|
||||
result = RowFlushMultipart(pseudo_obj, tpath);
|
||||
}
|
||||
|
||||
return result;
|
||||
}
|
||||
|
||||
// [NOTE]
|
||||
// Both fdent_lock and fdent_data_lock must be locked before calling.
|
||||
//
|
||||
int FdEntity::RowFlushNoMultipart(PseudoFdInfo* pseudo_obj, const char* tpath)
|
||||
{
|
||||
S3FS_PRN_INFO3("[tpath=%s][path=%s][pseudo_fd=%d][physical_fd=%d]", SAFESTRPTR(tpath), path.c_str(), (pseudo_obj ? pseudo_obj->GetPseudoFd() : -1), physical_fd);
|
||||
|
||||
if(-1 == physical_fd || !pseudo_obj){
|
||||
return -EBADF;
|
||||
}
|
||||
|
||||
if(pseudo_obj->IsUploading()){
|
||||
S3FS_PRN_ERR("Why uploading now, even though s3fs is No Multipart uploading mode.");
|
||||
return -EBADF;
|
||||
}
|
||||
|
||||
int result;
|
||||
std::string tmppath = path;
|
||||
headers_t tmporgmeta = orgmeta;
|
||||
|
||||
// If there is no loading all of the area, loading all area.
|
||||
off_t restsize = pagelist.GetTotalUnloadedPageSize();
|
||||
if(0 < restsize){
|
||||
if(!pseudo_obj->IsUploading()){
|
||||
// check disk space
|
||||
if(ReserveDiskSpace(restsize)){
|
||||
// enough disk space
|
||||
// Load all uninitialized area(no mix multipart uploading)
|
||||
if(!FdEntity::mixmultipart){
|
||||
result = Load(/*start=*/ 0, /*size=*/ 0, AutoLock::ALREADY_LOCKED);
|
||||
}
|
||||
FdManager::FreeReservedDiskSpace(restsize);
|
||||
if(0 != result){
|
||||
S3FS_PRN_ERR("failed to upload all area(errno=%d)", result);
|
||||
return result;
|
||||
}
|
||||
}else{
|
||||
// no enough disk space
|
||||
if(nomultipart){
|
||||
S3FS_PRN_WARN("Not enough local storage to flush: [path=%s][pseudo_fd=%d][physical_fd=%d]", path.c_str(), fd, physical_fd);
|
||||
return -ENOSPC; // No space left on device
|
||||
}
|
||||
if(0 != (result = NoCachePreMultipartPost(pseudo_obj))){
|
||||
S3FS_PRN_ERR("failed to switch multipart uploading with no cache(errno=%d)", result);
|
||||
return result;
|
||||
}
|
||||
// upload all by multipart uploading
|
||||
if(0 != (result = NoCacheLoadAndPost(pseudo_obj))){
|
||||
S3FS_PRN_ERR("failed to upload all area by multipart uploading(errno=%d)", result);
|
||||
return result;
|
||||
}
|
||||
}
|
||||
// check disk space
|
||||
if(!ReserveDiskSpace(restsize)){
|
||||
// no enough disk space
|
||||
S3FS_PRN_WARN("Not enough local storage to flush: [path=%s][pseudo_fd=%d][physical_fd=%d]", path.c_str(), pseudo_obj->GetPseudoFd(), physical_fd);
|
||||
return -ENOSPC; // No space left on device
|
||||
}
|
||||
}
|
||||
FdManager::FreeReservedDiskSpace(restsize);
|
||||
|
||||
// Always load all uninitialized area
|
||||
if(0 != (result = Load(/*start=*/ 0, /*size=*/ 0, AutoLock::ALREADY_LOCKED))){
|
||||
S3FS_PRN_ERR("failed to upload all area(errno=%d)", result);
|
||||
return result;
|
||||
}
|
||||
|
||||
// check size
|
||||
if(pagelist.Size() > MAX_MULTIPART_CNT * S3fsCurl::GetMultipartSize()){
|
||||
S3FS_PRN_ERR("Part count exceeds %d. Increase multipart size and try again.", MAX_MULTIPART_CNT);
|
||||
return -EFBIG;
|
||||
}
|
||||
|
||||
// backup upload file size
|
||||
struct stat st;
|
||||
memset(&st, 0, sizeof(struct stat));
|
||||
if(-1 == fstat(physical_fd, &st)){
|
||||
S3FS_PRN_ERR("fstat is failed by errno(%d), but continue...", errno);
|
||||
}
|
||||
|
||||
S3fsCurl s3fscurl(true);
|
||||
result = s3fscurl.PutRequest(tpath ? tpath : tmppath.c_str(), tmporgmeta, physical_fd);
|
||||
|
||||
// reset uploaded file size
|
||||
size_orgmeta = st.st_size;
|
||||
|
||||
if(0 == result){
|
||||
pagelist.ClearAllModified();
|
||||
}
|
||||
return result;
|
||||
}
|
||||
|
||||
// [NOTE]
|
||||
// Both fdent_lock and fdent_data_lock must be locked before calling.
|
||||
//
|
||||
int FdEntity::RowFlushMultipart(PseudoFdInfo* pseudo_obj, const char* tpath)
|
||||
{
|
||||
S3FS_PRN_INFO3("[tpath=%s][path=%s][pseudo_fd=%d][physical_fd=%d]", SAFESTRPTR(tpath), path.c_str(), (pseudo_obj ? pseudo_obj->GetPseudoFd() : -1), physical_fd);
|
||||
|
||||
if(-1 == physical_fd || !pseudo_obj){
|
||||
return -EBADF;
|
||||
}
|
||||
|
||||
int result = 0;
|
||||
|
||||
if(!pseudo_obj->IsUploading()){
|
||||
// Start uploading
|
||||
|
||||
// If there is no loading all of the area, loading all area.
|
||||
off_t restsize = pagelist.GetTotalUnloadedPageSize();
|
||||
|
||||
// Check rest size and free disk space
|
||||
if(0 < restsize && !ReserveDiskSpace(restsize)){
|
||||
// no enough disk space
|
||||
if(0 != (result = NoCachePreMultipartPost(pseudo_obj))){
|
||||
S3FS_PRN_ERR("failed to switch multipart uploading with no cache(errno=%d)", result);
|
||||
return result;
|
||||
}
|
||||
// upload all by multipart uploading
|
||||
if(0 != (result = NoCacheLoadAndPost(pseudo_obj))){
|
||||
S3FS_PRN_ERR("failed to upload all area by multipart uploading(errno=%d)", result);
|
||||
return result;
|
||||
}
|
||||
|
||||
}else{
|
||||
// already start multipart uploading
|
||||
// enough disk space or no rest size
|
||||
std::string tmppath = path;
|
||||
headers_t tmporgmeta = orgmeta;
|
||||
|
||||
FdManager::FreeReservedDiskSpace(restsize);
|
||||
|
||||
// Load all uninitialized area(no mix multipart uploading)
|
||||
if(0 != (result = Load(/*start=*/ 0, /*size=*/ 0, AutoLock::ALREADY_LOCKED))){
|
||||
S3FS_PRN_ERR("failed to upload all area(errno=%d)", result);
|
||||
return result;
|
||||
}
|
||||
|
||||
// backup upload file size
|
||||
struct stat st;
|
||||
memset(&st, 0, sizeof(struct stat));
|
||||
if(-1 == fstat(physical_fd, &st)){
|
||||
S3FS_PRN_ERR("fstat is failed by errno(%d), but continue...", errno);
|
||||
}
|
||||
|
||||
if(pagelist.Size() > MAX_MULTIPART_CNT * S3fsCurl::GetMultipartSize()){
|
||||
S3FS_PRN_ERR("Part count exceeds %d. Increase multipart size and try again.", MAX_MULTIPART_CNT);
|
||||
return -EFBIG;
|
||||
|
||||
}else if(pagelist.Size() >= S3fsCurl::GetMultipartSize()){
|
||||
// multipart uploading
|
||||
result = S3fsCurl::ParallelMultipartUploadRequest(tpath ? tpath : tmppath.c_str(), tmporgmeta, physical_fd);
|
||||
|
||||
}else{
|
||||
// normal uploading (too small part size)
|
||||
S3fsCurl s3fscurl(true);
|
||||
result = s3fscurl.PutRequest(tpath ? tpath : tmppath.c_str(), tmporgmeta, physical_fd);
|
||||
}
|
||||
|
||||
// reset uploaded file size
|
||||
size_orgmeta = st.st_size;
|
||||
}
|
||||
|
||||
}else{
|
||||
// Already start uploading
|
||||
|
||||
// upload rest data
|
||||
off_t untreated_start = 0;
|
||||
off_t untreated_size = 0;
|
||||
pseudo_obj->GetUntreated(untreated_start, untreated_size);
|
||||
|
||||
if(0 < untreated_size){
|
||||
if(0 != (result = NoCacheMultipartPost(pseudo_obj, physical_fd, untreated_start, untreated_size))){
|
||||
S3FS_PRN_ERR("failed to multipart post(start=%lld, size=%lld) for file(physical_fd=%d).", static_cast<long long int>(untreated_start), static_cast<long long int>(untreated_size), physical_fd);
|
||||
return result;
|
||||
}
|
||||
pseudo_obj->ClearUntreated();
|
||||
}
|
||||
// complete multipart uploading.
|
||||
if(0 != (result = NoCacheCompleteMultipartPost(pseudo_obj))){
|
||||
S3FS_PRN_ERR("failed to complete(finish) multipart post for file(physical_fd=%d).", physical_fd);
|
||||
return result;
|
||||
}
|
||||
// truncate file to zero
|
||||
if(-1 == ftruncate(physical_fd, 0)){
|
||||
// So the file has already been removed, skip error.
|
||||
S3FS_PRN_ERR("failed to truncate file(physical_fd=%d) to zero, but continue...", physical_fd);
|
||||
}
|
||||
// put pending headers
|
||||
if(0 != (result = UploadPendingMeta())){
|
||||
return result;
|
||||
}
|
||||
}
|
||||
|
||||
if(0 == result){
|
||||
pagelist.ClearAllModified();
|
||||
}
|
||||
return result;
|
||||
}
|
||||
|
||||
// [NOTE]
|
||||
// Both fdent_lock and fdent_data_lock must be locked before calling.
|
||||
//
|
||||
int FdEntity::RowFlushMixMultipart(PseudoFdInfo* pseudo_obj, const char* tpath)
|
||||
{
|
||||
S3FS_PRN_INFO3("[tpath=%s][path=%s][pseudo_fd=%d][physical_fd=%d]", SAFESTRPTR(tpath), path.c_str(), (pseudo_obj ? pseudo_obj->GetPseudoFd() : -1), physical_fd);
|
||||
|
||||
if(-1 == physical_fd || !pseudo_obj){
|
||||
return -EBADF;
|
||||
}
|
||||
|
||||
int result = 0;
|
||||
|
||||
if(!pseudo_obj->IsUploading()){
|
||||
// normal uploading
|
||||
//
|
||||
// Make decision to do multi upload (or not) based upon file size
|
||||
//
|
||||
// According to the AWS spec:
|
||||
// - 1 to 10,000 parts are allowed
|
||||
// - minimum size of parts is 5MB (expect for the last part)
|
||||
//
|
||||
// For our application, we will define minimum part size to be 10MB (10 * 2^20 Bytes)
|
||||
// minimum file size will be 64 GB - 2 ** 36
|
||||
//
|
||||
// Initially uploads will be done serially
|
||||
//
|
||||
// If file is > 20MB, then multipart will kick in
|
||||
//
|
||||
if(pagelist.Size() > MAX_MULTIPART_CNT * S3fsCurl::GetMultipartSize()){
|
||||
// close f ?
|
||||
S3FS_PRN_ERR("Part count exceeds %d. Increase multipart size and try again.", MAX_MULTIPART_CNT);
|
||||
return -EFBIG;
|
||||
}
|
||||
// Start uploading
|
||||
|
||||
// seek to head of file.
|
||||
if(0 != lseek(physical_fd, 0, SEEK_SET)){
|
||||
S3FS_PRN_ERR("lseek error(%d)", errno);
|
||||
return -errno;
|
||||
}
|
||||
// backup upload file size
|
||||
struct stat st;
|
||||
memset(&st, 0, sizeof(struct stat));
|
||||
if(-1 == fstat(physical_fd, &st)){
|
||||
S3FS_PRN_ERR("fstat is failed by errno(%d), but continue...", errno);
|
||||
}
|
||||
// If there is no loading all of the area, loading all area.
|
||||
off_t restsize = pagelist.GetTotalUnloadedPageSize();
|
||||
|
||||
if(pagelist.Size() >= S3fsCurl::GetMultipartSize() && !nomultipart){
|
||||
if(FdEntity::mixmultipart){
|
||||
// multipart uploading can use copy api
|
||||
// Check rest size and free disk space
|
||||
if(0 < restsize && !ReserveDiskSpace(restsize)){
|
||||
// no enough disk space
|
||||
if(0 != (result = NoCachePreMultipartPost(pseudo_obj))){
|
||||
S3FS_PRN_ERR("failed to switch multipart uploading with no cache(errno=%d)", result);
|
||||
return result;
|
||||
}
|
||||
// upload all by multipart uploading
|
||||
if(0 != (result = NoCacheLoadAndPost(pseudo_obj))){
|
||||
S3FS_PRN_ERR("failed to upload all area by multipart uploading(errno=%d)", result);
|
||||
return result;
|
||||
}
|
||||
|
||||
}else{
|
||||
// enough disk space or no rest size
|
||||
std::string tmppath = path;
|
||||
headers_t tmporgmeta = orgmeta;
|
||||
|
||||
FdManager::FreeReservedDiskSpace(restsize);
|
||||
|
||||
// backup upload file size
|
||||
struct stat st;
|
||||
memset(&st, 0, sizeof(struct stat));
|
||||
if(-1 == fstat(physical_fd, &st)){
|
||||
S3FS_PRN_ERR("fstat is failed by errno(%d), but continue...", errno);
|
||||
}
|
||||
|
||||
if(pagelist.Size() > MAX_MULTIPART_CNT * S3fsCurl::GetMultipartSize()){
|
||||
S3FS_PRN_ERR("Part count exceeds %d. Increase multipart size and try again.", MAX_MULTIPART_CNT);
|
||||
return -EFBIG;
|
||||
|
||||
}else if(pagelist.Size() >= S3fsCurl::GetMultipartSize()){
|
||||
// mix multipart uploading
|
||||
|
||||
// This is to ensure that each part is 5MB or more.
|
||||
// If the part is less than 5MB, download it.
|
||||
@ -1440,30 +1614,25 @@ int FdEntity::RowFlush(int fd, const char* tpath, bool force_sync)
|
||||
result = S3fsCurl::ParallelMixMultipartUploadRequest(tpath ? tpath : tmppath.c_str(), tmporgmeta, physical_fd, mixuppages);
|
||||
|
||||
}else{
|
||||
// multipart uploading not using copy api
|
||||
result = S3fsCurl::ParallelMultipartUploadRequest(tpath ? tpath : tmppath.c_str(), tmporgmeta, physical_fd);
|
||||
}
|
||||
}else{
|
||||
// If there are unloaded pages, they are loaded at here.
|
||||
if(0 != (result = Load(/*start=*/ 0, /*size=*/ 0, AutoLock::ALREADY_LOCKED))){
|
||||
S3FS_PRN_ERR("failed to load parts before uploading object(%d)", result);
|
||||
return result;
|
||||
// normal uploading (too small part size)
|
||||
|
||||
// If there are unloaded pages, they are loaded at here.
|
||||
if(0 != (result = Load(/*start=*/ 0, /*size=*/ 0, AutoLock::ALREADY_LOCKED))){
|
||||
S3FS_PRN_ERR("failed to load parts before uploading object(%d)", result);
|
||||
return result;
|
||||
}
|
||||
|
||||
S3fsCurl s3fscurl(true);
|
||||
result = s3fscurl.PutRequest(tpath ? tpath : tmppath.c_str(), tmporgmeta, physical_fd);
|
||||
}
|
||||
|
||||
S3fsCurl s3fscurl(true);
|
||||
result = s3fscurl.PutRequest(tpath ? tpath : tmppath.c_str(), tmporgmeta, physical_fd);
|
||||
// reset uploaded file size
|
||||
size_orgmeta = st.st_size;
|
||||
}
|
||||
|
||||
// seek to head of file.
|
||||
if(0 == result && 0 != lseek(physical_fd, 0, SEEK_SET)){
|
||||
S3FS_PRN_ERR("lseek error(%d)", errno);
|
||||
return -errno;
|
||||
}
|
||||
|
||||
// reset uploaded file size
|
||||
size_orgmeta = st.st_size;
|
||||
|
||||
}else{
|
||||
// Already start uploading
|
||||
|
||||
// upload rest data
|
||||
off_t untreated_start = 0;
|
||||
off_t untreated_size = 0;
|
||||
@ -1593,7 +1762,7 @@ ssize_t FdEntity::Write(int fd, const char* bytes, off_t start, size_t size)
|
||||
|
||||
PseudoFdInfo* pseudo_obj = NULL;
|
||||
if(-1 == physical_fd || NULL == (pseudo_obj = CheckPseudoFdFlags(fd, false))){
|
||||
S3FS_PRN_DBG("pseudo_fd(%d) to physical_fd(%d) for path(%s) is not opened or not writable", fd, physical_fd, path.c_str());
|
||||
S3FS_PRN_ERR("pseudo_fd(%d) to physical_fd(%d) for path(%s) is not opened or not writable", fd, physical_fd, path.c_str());
|
||||
return -EBADF;
|
||||
}
|
||||
|
||||
@ -1615,8 +1784,94 @@ ssize_t FdEntity::Write(int fd, const char* bytes, off_t start, size_t size)
|
||||
pagelist.SetPageLoadedStatus(pagelist.Size(), start - pagelist.Size(), PageList::PAGE_MODIFIED);
|
||||
}
|
||||
|
||||
int result = 0;
|
||||
ssize_t wsize;
|
||||
if(nomultipart){
|
||||
// No multipart upload
|
||||
wsize = WriteNoMultipart(pseudo_obj, bytes, start, size);
|
||||
}else if(FdEntity::mixmultipart){
|
||||
// Mix multipart upload
|
||||
wsize = WriteMixMultipart(pseudo_obj, bytes, start, size);
|
||||
}else{
|
||||
// Normal multipart upload
|
||||
wsize = WriteMultipart(pseudo_obj, bytes, start, size);
|
||||
}
|
||||
|
||||
return wsize;
|
||||
}
|
||||
|
||||
// [NOTE]
|
||||
// Both fdent_lock and fdent_data_lock must be locked before calling.
|
||||
//
|
||||
ssize_t FdEntity::WriteNoMultipart(PseudoFdInfo* pseudo_obj, const char* bytes, off_t start, size_t size)
|
||||
{
|
||||
S3FS_PRN_DBG("[path=%s][pseudo_fd=%d][physical_fd=%d][offset=%lld][size=%zu]", path.c_str(), (pseudo_obj ? pseudo_obj->GetPseudoFd() : -1), physical_fd, static_cast<long long int>(start), size);
|
||||
|
||||
if(-1 == physical_fd || !pseudo_obj){
|
||||
S3FS_PRN_ERR("pseudo_fd(%d) to physical_fd(%d) for path(%s) is not opened or not writable", (pseudo_obj ? pseudo_obj->GetPseudoFd() : -1), physical_fd, path.c_str());
|
||||
return -EBADF;
|
||||
}
|
||||
|
||||
int result = 0;
|
||||
|
||||
if(pseudo_obj->IsUploading()){
|
||||
S3FS_PRN_ERR("Why uploading now, even though s3fs is No Multipart uploading mode.");
|
||||
return -EBADF;
|
||||
}
|
||||
|
||||
// check disk space
|
||||
off_t restsize = pagelist.GetTotalUnloadedPageSize(0, start) + size;
|
||||
if(!ReserveDiskSpace(restsize)){
|
||||
// no enough disk space
|
||||
S3FS_PRN_WARN("Not enough local storage to cache write request: [path=%s][physical_fd=%d][offset=%lld][size=%zu]", path.c_str(), physical_fd, static_cast<long long int>(start), size);
|
||||
return -ENOSPC; // No space left on device
|
||||
}
|
||||
|
||||
// Load uninitialized area which starts from 0 to (start + size) before writing.
|
||||
if(0 < start){
|
||||
result = Load(0, start, AutoLock::ALREADY_LOCKED);
|
||||
}
|
||||
|
||||
FdManager::FreeReservedDiskSpace(restsize);
|
||||
if(0 != result){
|
||||
S3FS_PRN_ERR("failed to load uninitialized area before writing(errno=%d)", result);
|
||||
return result;
|
||||
}
|
||||
|
||||
// Writing
|
||||
ssize_t wsize;
|
||||
if(-1 == (wsize = pwrite(physical_fd, bytes, size, start))){
|
||||
S3FS_PRN_ERR("pwrite failed. errno(%d)", errno);
|
||||
return -errno;
|
||||
}
|
||||
if(0 < wsize){
|
||||
pagelist.SetPageLoadedStatus(start, wsize, PageList::PAGE_LOAD_MODIFIED);
|
||||
}
|
||||
|
||||
// Load uninitialized area which starts from (start + size) to EOF after writing.
|
||||
if(pagelist.Size() > start + static_cast<off_t>(size)){
|
||||
result = Load(start + size, pagelist.Size(), AutoLock::ALREADY_LOCKED);
|
||||
if(0 != result){
|
||||
S3FS_PRN_ERR("failed to load uninitialized area after writing(errno=%d)", result);
|
||||
return result;
|
||||
}
|
||||
}
|
||||
|
||||
return wsize;
|
||||
}
|
||||
|
||||
// [NOTE]
|
||||
// Both fdent_lock and fdent_data_lock must be locked before calling.
|
||||
//
|
||||
ssize_t FdEntity::WriteMultipart(PseudoFdInfo* pseudo_obj, const char* bytes, off_t start, size_t size)
|
||||
{
|
||||
S3FS_PRN_DBG("[path=%s][pseudo_fd=%d][physical_fd=%d][offset=%lld][size=%zu]", path.c_str(), (pseudo_obj ? pseudo_obj->GetPseudoFd() : -1), physical_fd, static_cast<long long int>(start), size);
|
||||
|
||||
if(-1 == physical_fd || !pseudo_obj){
|
||||
S3FS_PRN_ERR("pseudo_fd(%d) to physical_fd(%d) for path(%s) is not opened or not writable", (pseudo_obj ? pseudo_obj->GetPseudoFd() : -1), physical_fd, path.c_str());
|
||||
return -EBADF;
|
||||
}
|
||||
|
||||
int result = 0;
|
||||
|
||||
if(!pseudo_obj->IsUploading()){
|
||||
// check disk space
|
||||
@ -1625,10 +1880,8 @@ ssize_t FdEntity::Write(int fd, const char* bytes, off_t start, size_t size)
|
||||
// enough disk space
|
||||
|
||||
// Load uninitialized area which starts from 0 to (start + size) before writing.
|
||||
if(!FdEntity::mixmultipart){
|
||||
if(0 < start){
|
||||
result = Load(0, start, AutoLock::ALREADY_LOCKED);
|
||||
}
|
||||
if(0 < start){
|
||||
result = Load(0, start, AutoLock::ALREADY_LOCKED);
|
||||
}
|
||||
|
||||
FdManager::FreeReservedDiskSpace(restsize);
|
||||
@ -1638,11 +1891,7 @@ ssize_t FdEntity::Write(int fd, const char* bytes, off_t start, size_t size)
|
||||
}
|
||||
}else{
|
||||
// no enough disk space
|
||||
if (nomultipart) {
|
||||
S3FS_PRN_WARN("Not enough local storage to cache write request: [path=%s][physical_fd=%d][offset=%lld][size=%zu]", path.c_str(), physical_fd, static_cast<long long int>(start), size);
|
||||
return -ENOSPC; // No space left on device
|
||||
}
|
||||
if ((start + static_cast<off_t>(size)) <= S3fsCurl::GetMultipartSize()) {
|
||||
if((start + static_cast<off_t>(size)) <= S3fsCurl::GetMultipartSize()){
|
||||
S3FS_PRN_WARN("Not enough local storage to cache write request till multipart upload can start: [path=%s][physical_fd=%d][offset=%lld][size=%zu]", path.c_str(), physical_fd, static_cast<long long int>(start), size);
|
||||
return -ENOSPC; // No space left on device
|
||||
}
|
||||
@ -1662,6 +1911,7 @@ ssize_t FdEntity::Write(int fd, const char* bytes, off_t start, size_t size)
|
||||
}
|
||||
|
||||
// Writing
|
||||
ssize_t wsize;
|
||||
if(-1 == (wsize = pwrite(physical_fd, bytes, size, start))){
|
||||
S3FS_PRN_ERR("pwrite failed. errno(%d)", errno);
|
||||
return -errno;
|
||||
@ -1671,15 +1921,92 @@ ssize_t FdEntity::Write(int fd, const char* bytes, off_t start, size_t size)
|
||||
}
|
||||
|
||||
// Load uninitialized area which starts from (start + size) to EOF after writing.
|
||||
if(!FdEntity::mixmultipart){
|
||||
if(pagelist.Size() > start + static_cast<off_t>(size)){
|
||||
result = Load(start + size, pagelist.Size(), AutoLock::ALREADY_LOCKED);
|
||||
if(0 != result){
|
||||
S3FS_PRN_ERR("failed to load uninitialized area after writing(errno=%d)", result);
|
||||
if(pagelist.Size() > start + static_cast<off_t>(size)){
|
||||
result = Load(start + size, pagelist.Size(), AutoLock::ALREADY_LOCKED);
|
||||
if(0 != result){
|
||||
S3FS_PRN_ERR("failed to load uninitialized area after writing(errno=%d)", result);
|
||||
return result;
|
||||
}
|
||||
}
|
||||
|
||||
// check multipart uploading
|
||||
if(pseudo_obj->IsUploading()){
|
||||
off_t untreated_start = 0;
|
||||
off_t untreated_size = 0;
|
||||
pseudo_obj->GetUntreated(untreated_start, untreated_size);
|
||||
|
||||
untreated_size += wsize;
|
||||
if(S3fsCurl::GetMultipartSize() <= untreated_size){
|
||||
// over one multipart size
|
||||
if(0 != (result = NoCacheMultipartPost(pseudo_obj, physical_fd, untreated_start, untreated_size))){
|
||||
S3FS_PRN_ERR("failed to multipart post(start=%lld, size=%lld) for file(physical_fd=%d).", static_cast<long long int>(untreated_start), static_cast<long long int>(untreated_size), physical_fd);
|
||||
return result;
|
||||
}
|
||||
|
||||
// [NOTE]
|
||||
// truncate file to zero and set length to part offset + size
|
||||
// after this, file length is (offset + size), but file does not use any disk space.
|
||||
//
|
||||
if(-1 == ftruncate(physical_fd, 0) || -1 == ftruncate(physical_fd, (untreated_start + untreated_size))){
|
||||
S3FS_PRN_ERR("failed to truncate file(physical_fd=%d).", physical_fd);
|
||||
return -errno;
|
||||
}
|
||||
pseudo_obj->SetUntreated(untreated_start + untreated_size, 0);
|
||||
}
|
||||
}
|
||||
return wsize;
|
||||
}
|
||||
|
||||
// [NOTE]
|
||||
// Both fdent_lock and fdent_data_lock must be locked before calling.
|
||||
//
|
||||
ssize_t FdEntity::WriteMixMultipart(PseudoFdInfo* pseudo_obj, const char* bytes, off_t start, size_t size)
|
||||
{
|
||||
S3FS_PRN_DBG("[path=%s][pseudo_fd=%d][physical_fd=%d][offset=%lld][size=%zu]", path.c_str(), (pseudo_obj ? pseudo_obj->GetPseudoFd() : -1), physical_fd, static_cast<long long int>(start), size);
|
||||
|
||||
if(-1 == physical_fd || !pseudo_obj){
|
||||
S3FS_PRN_ERR("pseudo_fd(%d) to physical_fd(%d) for path(%s) is not opened or not writable", (pseudo_obj ? pseudo_obj->GetPseudoFd() : -1), physical_fd, path.c_str());
|
||||
return -EBADF;
|
||||
}
|
||||
|
||||
int result;
|
||||
|
||||
if(!pseudo_obj->IsUploading()){
|
||||
// check disk space
|
||||
off_t restsize = pagelist.GetTotalUnloadedPageSize(0, start) + size;
|
||||
if(ReserveDiskSpace(restsize)){
|
||||
// enough disk space
|
||||
FdManager::FreeReservedDiskSpace(restsize);
|
||||
}else{
|
||||
// no enough disk space
|
||||
if((start + static_cast<off_t>(size)) <= S3fsCurl::GetMultipartSize()){
|
||||
S3FS_PRN_WARN("Not enough local storage to cache write request till multipart upload can start: [path=%s][physical_fd=%d][offset=%lld][size=%zu]", path.c_str(), physical_fd, static_cast<long long int>(start), size);
|
||||
return -ENOSPC; // No space left on device
|
||||
}
|
||||
if(0 != (result = NoCachePreMultipartPost(pseudo_obj))){
|
||||
S3FS_PRN_ERR("failed to switch multipart uploading with no cache(errno=%d)", result);
|
||||
return result;
|
||||
}
|
||||
// start multipart uploading
|
||||
if(0 != (result = NoCacheLoadAndPost(pseudo_obj, 0, start))){
|
||||
S3FS_PRN_ERR("failed to load uninitialized area and multipart uploading it(errno=%d)", result);
|
||||
return result;
|
||||
}
|
||||
pseudo_obj->SetUntreated(start, 0);
|
||||
}
|
||||
}else{
|
||||
// already start multipart uploading
|
||||
}
|
||||
|
||||
// Writing
|
||||
ssize_t wsize;
|
||||
if(-1 == (wsize = pwrite(physical_fd, bytes, size, start))){
|
||||
S3FS_PRN_ERR("pwrite failed. errno(%d)", errno);
|
||||
return -errno;
|
||||
}
|
||||
if(0 < wsize){
|
||||
pagelist.SetPageLoadedStatus(start, wsize, PageList::PAGE_LOAD_MODIFIED);
|
||||
}
|
||||
|
||||
// check multipart uploading
|
||||
if(pseudo_obj->IsUploading()){
|
||||
|
@ -67,6 +67,12 @@ class FdEntity
|
||||
int NoCachePreMultipartPost(PseudoFdInfo* pseudo_obj);
|
||||
int NoCacheMultipartPost(PseudoFdInfo* pseudo_obj, int tgfd, off_t start, off_t size);
|
||||
int NoCacheCompleteMultipartPost(PseudoFdInfo* pseudo_obj);
|
||||
int RowFlushNoMultipart(PseudoFdInfo* pseudo_obj, const char* tpath);
|
||||
int RowFlushMultipart(PseudoFdInfo* pseudo_obj, const char* tpath);
|
||||
int RowFlushMixMultipart(PseudoFdInfo* pseudo_obj, const char* tpath);
|
||||
ssize_t WriteNoMultipart(PseudoFdInfo* pseudo_obj, const char* bytes, off_t start, size_t size);
|
||||
ssize_t WriteMultipart(PseudoFdInfo* pseudo_obj, const char* bytes, off_t start, size_t size);
|
||||
ssize_t WriteMixMultipart(PseudoFdInfo* pseudo_obj, const char* bytes, off_t start, size_t size);
|
||||
int UploadPendingMeta();
|
||||
|
||||
public:
|
||||
|
Loading…
Reference in New Issue
Block a user