Add the stream upload which starts uploading parts before Flush

This commit is contained in:
Takeshi Nakatani 2021-11-02 12:16:48 +00:00 committed by Andrew Gaul
parent 3a0799ec18
commit d22e1dc018
15 changed files with 1762 additions and 128 deletions

View File

@ -54,6 +54,7 @@ s3fs_SOURCES = \
addhead.cpp \
sighandlers.cpp \
autolock.cpp \
threadpoolman.cpp \
common_auth.cpp
if USE_SSL_OPENSSL
s3fs_SOURCES += openssl_auth.cpp

View File

@ -1200,6 +1200,72 @@ int S3fsCurl::MapPutErrorResponse(int result)
return result;
}
// [NOTE]
// It is a factory method as utility because it requires an S3fsCurl object
// initialized for multipart upload from outside this class.
//
S3fsCurl* S3fsCurl::CreateParallelS3fsCurl(const char* tpath, int fd, off_t start, off_t size, int part_num, bool is_copy, etagpair* petag, const std::string& upload_id, int& result)
{
// duplicate fd
if(!tpath || -1 == fd || start < 0 || size <= 0 || !petag){
S3FS_PRN_ERR("Parameters are wrong: tpath(%s), fd(%d), start(%lld), size(%lld), petag(%s)", SAFESTRPTR(tpath), fd, static_cast<long long int>(start), static_cast<long long int>(size), (petag ? "not null" : "null"));
result = -EIO;
return NULL;
}
result = 0;
S3fsCurl* s3fscurl = new S3fsCurl(true);
if(!is_copy){
s3fscurl->partdata.fd = fd;
s3fscurl->partdata.startpos = start;
s3fscurl->partdata.size = size;
s3fscurl->partdata.is_copy = is_copy;
s3fscurl->partdata.petag = petag; // [NOTE] be careful, the value is set directly
s3fscurl->b_partdata_startpos = s3fscurl->partdata.startpos;
s3fscurl->b_partdata_size = s3fscurl->partdata.size;
S3FS_PRN_INFO3("Upload Part [tpath=%s][start=%lld][size=%lld][part=%d]", SAFESTRPTR(tpath), static_cast<long long>(start), static_cast<long long>(size), part_num);
if(0 != (result = s3fscurl->UploadMultipartPostSetup(tpath, part_num, upload_id))){
S3FS_PRN_ERR("failed uploading part setup(%d)", result);
delete s3fscurl;
return NULL;
}
}else{
headers_t meta;
std::string srcresource;
std::string srcurl;
MakeUrlResource(get_realpath(tpath).c_str(), srcresource, srcurl);
meta["x-amz-copy-source"] = srcresource;
std::ostringstream strrange;
strrange << "bytes=" << start << "-" << (start + size - 1);
meta["x-amz-copy-source-range"] = strrange.str();
s3fscurl->b_from = SAFESTRPTR(tpath);
s3fscurl->b_meta = meta;
s3fscurl->partdata.petag = petag; // [NOTE] be careful, the value is set directly
S3FS_PRN_INFO3("Copy Part [tpath=%s][start=%lld][size=%lld][part=%d]", SAFESTRPTR(tpath), static_cast<long long>(start), static_cast<long long>(size), part_num);
if(0 != (result = s3fscurl->CopyMultipartPostSetup(tpath, tpath, part_num, upload_id, meta))){
S3FS_PRN_ERR("failed uploading part setup(%d)", result);
delete s3fscurl;
return NULL;
}
}
// Call lazy function
if(!s3fscurl->fpLazySetup || !s3fscurl->fpLazySetup(s3fscurl)){
S3FS_PRN_ERR("failed lazy function setup for uploading part");
result = -EIO;
delete s3fscurl;
return NULL;
}
return s3fscurl;
}
int S3fsCurl::ParallelMultipartUploadRequest(const char* tpath, headers_t& meta, int fd)
{
int result;

View File

@ -45,7 +45,7 @@
// CURLOPT_SSL_ENABLE_ALPN 7.36.0 and later
// CURLOPT_KEEP_SENDING_ON_ERROR 7.51.0 and later
//
// s3fs uses these, if you build s3fs with the old libcurl,
// s3fs uses these, if you build s3fs with the old libcurl,
// substitute the following symbols to avoid errors.
// If the version of libcurl linked at runtime is old,
// curl_easy_setopt results in an error(CURLE_UNKNOWN_OPTION) and
@ -191,7 +191,7 @@ class S3fsCurl
std::vector<pthread_t> *completed_tids;
s3fscurl_lazy_setup fpLazySetup; // curl options for lazy setting function
CURLcode curlCode; // handle curl return
public:
static const long S3FSCURL_RESPONSECODE_NOTSET = -1;
static const long S3FSCURL_RESPONSECODE_FATAL_ERROR = -2;
@ -230,7 +230,6 @@ class S3fsCurl
static S3fsCurl* ParallelGetObjectRetryCallback(S3fsCurl* s3fscurl);
// lazy functions for set curl options
static bool UploadMultipartPostSetCurlOpts(S3fsCurl* s3fscurl);
static bool CopyMultipartPostSetCurlOpts(S3fsCurl* s3fscurl);
static bool PreGetObjectRequestSetCurlOpts(S3fsCurl* s3fscurl);
static bool PreHeadRequestSetCurlOpts(S3fsCurl* s3fscurl);
@ -259,7 +258,6 @@ class S3fsCurl
int CopyMultipartPostSetup(const char* from, const char* to, int part_num, const std::string& upload_id, headers_t& meta);
bool UploadMultipartPostComplete();
bool CopyMultipartPostComplete();
bool MixMultipartPostComplete();
int MapPutErrorResponse(int result);
public:
@ -268,10 +266,14 @@ class S3fsCurl
static bool InitCredentialObject(S3fsCred* pcredobj);
static bool InitMimeType(const std::string& strFile);
static bool DestroyS3fsCurl();
static S3fsCurl* CreateParallelS3fsCurl(const char* tpath, int fd, off_t start, off_t size, int part_num, bool is_copy, etagpair* petag, const std::string& upload_id, int& result);
static int ParallelMultipartUploadRequest(const char* tpath, headers_t& meta, int fd);
static int ParallelMixMultipartUploadRequest(const char* tpath, headers_t& meta, int fd, const fdpage_list_t& mixuppages);
static int ParallelGetObjectRequest(const char* tpath, int fd, off_t start, off_t size);
// lazy functions for set curl options(public)
static bool UploadMultipartPostSetCurlOpts(S3fsCurl* s3fscurl);
// class methods(variables)
static std::string LookupMimeType(const std::string& name);
static bool SetCheckCertificate(bool isCertCheck);
@ -357,6 +359,7 @@ class S3fsCurl
int PreMultipartPostRequest(const char* tpath, headers_t& meta, std::string& upload_id, bool is_copy);
int CompleteMultipartPostRequest(const char* tpath, const std::string& upload_id, etaglist_t& parts);
int UploadMultipartPostRequest(const char* tpath, int part_num, const std::string& upload_id);
bool MixMultipartPostComplete();
int MultipartListRequest(std::string& body);
int AbortMultipartUpload(const char* tpath, const std::string& upload_id);
int MultipartHeadRequest(const char* tpath, off_t size, headers_t& meta, bool is_copy);

View File

@ -43,6 +43,7 @@ static const int MAX_MULTIPART_CNT = 10 * 1000; // S3 multipart max coun
// FdEntity class variables
//------------------------------------------------
bool FdEntity::mixmultipart = true;
bool FdEntity::streamupload = false;
//------------------------------------------------
// FdEntity class methods
@ -54,6 +55,13 @@ bool FdEntity::SetNoMixMultipart()
return old;
}
bool FdEntity::SetStreamUpload(bool isstream)
{
bool old = streamupload;
streamupload = isstream;
return old;
}
int FdEntity::FillFile(int fd, unsigned char byte, off_t size, off_t start)
{
unsigned char bytes[1024 * 32]; // 32kb
@ -418,6 +426,13 @@ int FdEntity::Open(const headers_t* pmeta, off_t size, time_t time, int flags, A
AutoLock auto_data_lock(&fdent_data_lock);
// [NOTE]
// When the file size is incremental by truncating, it must be keeped
// as an untreated area, and this area is set to these variables.
//
off_t truncated_start = 0;
off_t truncated_size = 0;
if(-1 != physical_fd){
//
// already open file
@ -436,6 +451,14 @@ int FdEntity::Open(const headers_t* pmeta, off_t size, time_t time, int flags, A
return -EIO;
}
}
// set untreated area
if(0 <= size && size_orgmeta < size){
// set untreated area
truncated_start = size_orgmeta;
truncated_size = size - size_orgmeta;
}
// set original headers and set size.
off_t new_size = (0 <= size ? size : size_orgmeta);
if(pmeta){
@ -614,6 +637,12 @@ int FdEntity::Open(const headers_t* pmeta, off_t size, time_t time, int flags, A
size_orgmeta = 0;
}
// set untreated area
if(0 <= size && size_orgmeta < size){
truncated_start = size_orgmeta;
truncated_size = size - size_orgmeta;
}
// set mtime and ctime(set "x-amz-meta-mtime" and "x-amz-meta-ctime" in orgmeta)
if(-1 != time){
struct timespec ts = {time, 0};
@ -633,6 +662,18 @@ int FdEntity::Open(const headers_t* pmeta, off_t size, time_t time, int flags, A
int pseudo_fd = ppseudoinfo->GetPseudoFd();
pseudo_fd_map[pseudo_fd] = ppseudoinfo;
// if there is untreated area, set it to pseudo object.
if(0 < truncated_size){
if(!ppseudoinfo->AddUntreated(truncated_start, truncated_size)){
pseudo_fd_map.erase(pseudo_fd);
if(pfile){
fclose(pfile);
pfile = NULL;
}
delete ppseudoinfo;
}
}
return pseudo_fd;
}
@ -1317,6 +1358,7 @@ int FdEntity::NoCacheCompleteMultipartPost(PseudoFdInfo* pseudo_obj)
s3fscurl.DestroyCurlHandle();
// clear multipart upload info
pseudo_obj->ClearUntreated();
pseudo_obj->ClearUploadInfo(false);
return 0;
@ -1370,10 +1412,17 @@ int FdEntity::RowFlush(int fd, const char* tpath, bool force_sync)
return 0;
}
if(S3fsLog::IsS3fsLogDbg()){
pagelist.Dump();
}
int result;
if(nomultipart){
// No multipart upload
result = RowFlushNoMultipart(pseudo_obj, tpath);
}else if(FdEntity::streamupload){
// Stream maultipart upload
result = RowFlushStreamMultipart(pseudo_obj, tpath);
}else if(FdEntity::mixmultipart){
// Mix multipart upload
result = RowFlushMixMultipart(pseudo_obj, tpath);
@ -1685,6 +1734,187 @@ int FdEntity::RowFlushMixMultipart(PseudoFdInfo* pseudo_obj, const char* tpath)
return result;
}
// [NOTE]
// Both fdent_lock and fdent_data_lock must be locked before calling.
//
int FdEntity::RowFlushStreamMultipart(PseudoFdInfo* pseudo_obj, const char* tpath)
{
S3FS_PRN_INFO3("[tpath=%s][path=%s][pseudo_fd=%d][physical_fd=%d][mix_upload=%s]", SAFESTRPTR(tpath), path.c_str(), (pseudo_obj ? pseudo_obj->GetPseudoFd() : -1), physical_fd, (FdEntity::mixmultipart ? "true" : "false"));
if(-1 == physical_fd || !pseudo_obj){
return -EBADF;
}
int result;
if(pagelist.Size() <= S3fsCurl::GetMultipartSize()){
//
// Use normal upload instead of multipart upload(too small part size)
//
// backup upload file size
struct stat st;
memset(&st, 0, sizeof(struct stat));
if(-1 == fstat(physical_fd, &st)){
S3FS_PRN_ERR("fstat is failed by errno(%d), but continue...", errno);
}
// If there are unloaded pages, they are loaded at here.
if(0 != (result = Load(/*start=*/ 0, /*size=*/ 0, AutoLock::ALREADY_LOCKED))){
S3FS_PRN_ERR("failed to load parts before uploading object(%d)", result);
return result;
}
headers_t tmporgmeta = orgmeta;
S3fsCurl s3fscurl(true);
result = s3fscurl.PutRequest(path.c_str(), tmporgmeta, physical_fd);
// reset uploaded file size
size_orgmeta = st.st_size;
pseudo_obj->ClearUntreated();
if(0 == result){
pagelist.ClearAllModified();
}
}else{
//
// Make upload/download/copy/cancel lists from file
//
mp_part_list_t to_upload_list;
mp_part_list_t to_copy_list;
mp_part_list_t to_download_list;
filepart_list_t cancel_uploaded_list;
if(!pseudo_obj->ExtractUploadPartsFromAllArea(to_upload_list, to_copy_list, to_download_list, cancel_uploaded_list, S3fsCurl::GetMultipartSize(), pagelist.Size(), FdEntity::mixmultipart)){
S3FS_PRN_ERR("Failed to extract various upload parts list from all area: errno(EIO)");
return -EIO;
}
//
// Check total size for downloading and Download
//
total_mp_part_list mptoal;
off_t total_download_size = mptoal(to_download_list);
if(0 < total_download_size){
//
// Check if there is enough free disk space for the total download size
//
if(!ReserveDiskSpace(total_download_size)){
// no enough disk space
//
// [NOTE]
// Because there is no left space size to download, we can't solve this anymore
// in this case which is uploading in sequence.
//
S3FS_PRN_WARN("Not enough local storage(%lld byte) to cache write request for whole of the file: [path=%s][physical_fd=%d]", static_cast<long long int>(total_download_size), path.c_str(), physical_fd);
return -ENOSPC; // No space left on device
}
// enough disk space
//
// Download all parts
//
// [TODO]
// Execute in parallel downloading with multiple thread.
//
for(mp_part_list_t::const_iterator download_iter = to_download_list.begin(); download_iter != to_download_list.end(); ++download_iter){
if(0 != (result = Load(download_iter->start, download_iter->size, AutoLock::ALREADY_LOCKED))){
break;
}
}
FdManager::FreeReservedDiskSpace(total_download_size);
if(0 != result){
S3FS_PRN_ERR("failed to load uninitialized area before writing(errno=%d)", result);
return result;
}
}
//
// Has multipart uploading already started?
//
if(!pseudo_obj->IsUploading()){
//
// Multipart uploading hasn't started yet, so start it.
//
S3fsCurl s3fscurl(true);
std::string upload_id;
if(0 != (result = s3fscurl.PreMultipartPostRequest(path.c_str(), orgmeta, upload_id, true))){
S3FS_PRN_ERR("failed to setup multipart upload(create upload id) by errno(%d)", result);
return result;
}
if(!pseudo_obj->InitialUploadInfo(upload_id)){
S3FS_PRN_ERR("failed to setup multipart upload(set upload id to object)");
return -EIO;
}
// Clear the dirty flag, because the meta data is updated.
is_meta_pending = false;
}
//
// Output debug level information
//
// When canceling(overwriting) a part that has already been uploaded, output it.
//
if(S3fsLog::IsS3fsLogDbg()){
for(filepart_list_t::const_iterator cancel_iter = cancel_uploaded_list.begin(); cancel_iter != cancel_uploaded_list.end(); ++cancel_iter){
S3FS_PRN_DBG("Cancel uploaded: start(%lld), size(%lld), part number(%d)", static_cast<long long int>(cancel_iter->startpos), static_cast<long long int>(cancel_iter->size), (cancel_iter->petag ? cancel_iter->petag->part_num : -1));
}
}
//
// Upload multipart and copy parts and wait exiting them
//
if(!pseudo_obj->ParallelMultipartUploadAll(path.c_str(), to_upload_list, to_copy_list, result)){
S3FS_PRN_ERR("Failed to upload multipart parts.");
pseudo_obj->ClearUntreated();
pseudo_obj->ClearUploadInfo(false); // clear multipart upload info
return -EIO;
}
if(0 != result){
S3FS_PRN_ERR("An error(%d) occurred in some threads that were uploading parallel multiparts, but continue to clean up..", result);
pseudo_obj->ClearUntreated();
pseudo_obj->ClearUploadInfo(false); // clear multipart upload info
return result;
}
//
// Complete uploading
//
std::string upload_id;
etaglist_t etaglist;
if(!pseudo_obj->GetUploadId(upload_id) || !pseudo_obj->GetEtaglist(etaglist)){
S3FS_PRN_ERR("There is no upload id or etag list.");
pseudo_obj->ClearUntreated();
pseudo_obj->ClearUploadInfo(false); // clear multipart upload info
return -EIO;
}else{
S3fsCurl s3fscurl(true);
if(0 != (result = s3fscurl.CompleteMultipartPostRequest(path.c_str(), upload_id, etaglist))){
S3FS_PRN_ERR("failed to complete multipart upload by errno(%d)", result);
pseudo_obj->ClearUntreated();
pseudo_obj->ClearUploadInfo(false); // clear multipart upload info
return result;
}
s3fscurl.DestroyCurlHandle();
}
pseudo_obj->ClearUntreated();
pseudo_obj->ClearUploadInfo(false); // clear multipart upload info
// put pending headers
if(0 != (result = UploadPendingMeta())){
return result;
}
}
pseudo_obj->ClearUntreated();
if(0 == result){
pagelist.ClearAllModified();
}
return result;
}
// [NOTICE]
// Need to lock before calling this method.
bool FdEntity::ReserveDiskSpace(off_t size)
@ -1798,6 +2028,12 @@ ssize_t FdEntity::Write(int fd, const char* bytes, off_t start, size_t size)
S3FS_PRN_ERR("failed to truncate temporary file(physical_fd=%d).", physical_fd);
return -errno;
}
// set untreated area
if(!pseudo_obj->AddUntreated(pagelist.Size(), (start - pagelist.Size()))){
S3FS_PRN_ERR("failed to set untreated area by incremental.");
return -EIO;
}
// add new area
pagelist.SetPageLoadedStatus(pagelist.Size(), start - pagelist.Size(), PageList::PAGE_MODIFIED);
}
@ -1806,6 +2042,9 @@ ssize_t FdEntity::Write(int fd, const char* bytes, off_t start, size_t size)
if(nomultipart){
// No multipart upload
wsize = WriteNoMultipart(pseudo_obj, bytes, start, size);
}else if(FdEntity::streamupload){
// Stream upload
wsize = WriteStreamUpload(pseudo_obj, bytes, start, size);
}else if(FdEntity::mixmultipart){
// Mix multipart upload
wsize = WriteMixMultipart(pseudo_obj, bytes, start, size);
@ -2055,6 +2294,54 @@ ssize_t FdEntity::WriteMixMultipart(PseudoFdInfo* pseudo_obj, const char* bytes,
return wsize;
}
//
// On Stream upload, the uploading is executed in another thread when the
// written area exceeds the maximum size of multipart upload.
//
// [NOTE]
// Both fdent_lock and fdent_data_lock must be locked before calling.
//
ssize_t FdEntity::WriteStreamUpload(PseudoFdInfo* pseudo_obj, const char* bytes, off_t start, size_t size)
{
S3FS_PRN_DBG("[path=%s][pseudo_fd=%d][physical_fd=%d][offset=%lld][size=%zu]", path.c_str(), (pseudo_obj ? pseudo_obj->GetPseudoFd() : -1), physical_fd, static_cast<long long int>(start), size);
if(-1 == physical_fd || !pseudo_obj){
S3FS_PRN_ERR("pseudo_fd(%d) to physical_fd(%d) for path(%s) is not opened or not writable", (pseudo_obj ? pseudo_obj->GetPseudoFd() : -1), physical_fd, path.c_str());
return -EBADF;
}
// Writing
ssize_t wsize;
if(-1 == (wsize = pwrite(physical_fd, bytes, size, start))){
S3FS_PRN_ERR("pwrite failed. errno(%d)", errno);
return -errno;
}
if(0 < wsize){
pagelist.SetPageLoadedStatus(start, wsize, PageList::PAGE_LOAD_MODIFIED);
pseudo_obj->AddUntreated(start, wsize);
}
// Check and Upload
//
// If the last updated Untreated area exceeds the maximum upload size,
// upload processing is performed.
//
headers_t tmporgmeta = orgmeta;
bool isuploading = pseudo_obj->IsUploading();
int result;
if(0 != (result = pseudo_obj->UploadBoundaryLastUntreatedArea(path.c_str(), tmporgmeta))){
S3FS_PRN_ERR("Failed to upload the last untreated parts(area) : result=%d", result);
return result;
}
if(!isuploading && pseudo_obj->IsUploading()){
// Clear the dirty flag, because the meta data is updated.
is_meta_pending = false;
}
return wsize;
}
// [NOTE]
// Returns true if merged to orgmeta.
// If true is returned, the caller can update the header.

View File

@ -44,6 +44,7 @@ class FdEntity
};
static bool mixmultipart; // whether multipart uploading can use copy api.
static bool streamupload; // whether stream uploading.
pthread_mutex_t fdent_lock;
bool is_lock_init;
@ -81,13 +82,17 @@ class FdEntity
int RowFlushNoMultipart(PseudoFdInfo* pseudo_obj, const char* tpath);
int RowFlushMultipart(PseudoFdInfo* pseudo_obj, const char* tpath);
int RowFlushMixMultipart(PseudoFdInfo* pseudo_obj, const char* tpath);
int RowFlushStreamMultipart(PseudoFdInfo* pseudo_obj, const char* tpath);
ssize_t WriteNoMultipart(PseudoFdInfo* pseudo_obj, const char* bytes, off_t start, size_t size);
ssize_t WriteMultipart(PseudoFdInfo* pseudo_obj, const char* bytes, off_t start, size_t size);
ssize_t WriteMixMultipart(PseudoFdInfo* pseudo_obj, const char* bytes, off_t start, size_t size);
ssize_t WriteStreamUpload(PseudoFdInfo* pseudo_obj, const char* bytes, off_t start, size_t size);
public:
static bool GetNoMixMultipart() { return mixmultipart; }
static bool SetNoMixMultipart();
static bool GetStreamUpload() { return streamupload; }
static bool SetStreamUpload(bool isstream);
explicit FdEntity(const char* tpath = NULL, const char* cpath = NULL);
~FdEntity();

View File

@ -20,18 +20,108 @@
#include <cstdio>
#include <cstdlib>
#include <unistd.h>
#include <algorithm>
#include <fstream>
#include <iostream>
#include <climits>
#include "common.h"
#include "s3fs.h"
#include "fdcache_fdinfo.h"
#include "fdcache_pseudofd.h"
#include "autolock.h"
#include "curl.h"
#include "string_util.h"
#include "threadpoolman.h"
//------------------------------------------------
// PseudoFdInfo class variables
//------------------------------------------------
int PseudoFdInfo::max_threads = -1;
int PseudoFdInfo::opt_max_threads = -1;
//------------------------------------------------
// PseudoFdInfo class methods
//------------------------------------------------
//
// Worker function for uploading
//
void* PseudoFdInfo::MultipartUploadThreadWorker(void* arg)
{
pseudofdinfo_thparam* pthparam = static_cast<pseudofdinfo_thparam*>(arg);
if(!pthparam || !(pthparam->ppseudofdinfo)){
return (void*)(intptr_t)(-EIO);
}
S3FS_PRN_INFO3("Upload Part Thread [tpath=%s][start=%lld][size=%lld][part=%d]", pthparam->path.c_str(), static_cast<long long>(pthparam->start), static_cast<long long>(pthparam->size), pthparam->part_num);
if(0 != pthparam->ppseudofdinfo->last_result){
S3FS_PRN_DBG("Already occurred error, thus this thread worker is exiting.");
AutoLock auto_lock(&(pthparam->ppseudofdinfo->upload_list_lock));
if(0 < pthparam->ppseudofdinfo->instruct_count){
--(pthparam->ppseudofdinfo->instruct_count);
}
++(pthparam->ppseudofdinfo->completed_count);
return (void*)(intptr_t)(pthparam->ppseudofdinfo->last_result);
}
// setup and make curl object
int result = 0;
S3fsCurl* s3fscurl;
if(NULL == (s3fscurl = S3fsCurl::CreateParallelS3fsCurl(pthparam->path.c_str(), pthparam->upload_fd, pthparam->start, pthparam->size, pthparam->part_num, pthparam->is_copy, pthparam->petag, pthparam->upload_id, result))){
S3FS_PRN_ERR("failed creating s3fs curl object for uploading [path=%s][start=%lld][size=%lld][part=%d]", pthparam->path.c_str(), static_cast<long long>(pthparam->start), static_cast<long long>(pthparam->size), pthparam->part_num);
// set result for exiting
AutoLock auto_lock(&(pthparam->ppseudofdinfo->upload_list_lock));
if(0 < pthparam->ppseudofdinfo->instruct_count){
--(pthparam->ppseudofdinfo->instruct_count);
}
++(pthparam->ppseudofdinfo->completed_count);
if(0 != result){
pthparam->ppseudofdinfo->last_result = result;
}
return (void*)(intptr_t)(result);
}
// Send request and get result
if(0 == (result = s3fscurl->RequestPerform())){
S3FS_PRN_DBG("succeed uploading [path=%s][start=%lld][size=%lld][part=%d]", pthparam->path.c_str(), static_cast<long long>(pthparam->start), static_cast<long long>(pthparam->size), pthparam->part_num);
if(!s3fscurl->MixMultipartPostComplete()){
S3FS_PRN_ERR("failed completion uploading [path=%s][start=%lld][size=%lld][part=%d]", pthparam->path.c_str(), static_cast<long long>(pthparam->start), static_cast<long long>(pthparam->size), pthparam->part_num);
result = -EIO;
}
}else{
S3FS_PRN_ERR("failed uploading with error(%d) [path=%s][start=%lld][size=%lld][part=%d]", result, pthparam->path.c_str(), static_cast<long long>(pthparam->start), static_cast<long long>(pthparam->size), pthparam->part_num);
}
s3fscurl->DestroyCurlHandle(true, false);
delete s3fscurl;
// set result
{
AutoLock auto_lock(&(pthparam->ppseudofdinfo->upload_list_lock));
if(0 < pthparam->ppseudofdinfo->instruct_count){
--(pthparam->ppseudofdinfo->instruct_count);
}
++(pthparam->ppseudofdinfo->completed_count);
if(0 != result){
pthparam->ppseudofdinfo->last_result = result;
}
}
return (void*)(intptr_t)(result);
}
//------------------------------------------------
// PseudoFdInfo methods
//------------------------------------------------
PseudoFdInfo::PseudoFdInfo(int fd, int open_flags) : pseudo_fd(-1), physical_fd(fd), flags(0) //, is_lock_init(false)
PseudoFdInfo::PseudoFdInfo(int fd, int open_flags) : pseudo_fd(-1), physical_fd(fd), flags(0), upload_fd(-1), uploaded_sem(0), instruct_count(0), completed_count(0), last_result(0)
{
pthread_mutexattr_t attr;
pthread_mutexattr_init(&attr);
@ -72,6 +162,47 @@ bool PseudoFdInfo::Clear()
pseudo_fd = -1;
physical_fd = -1;
CloseUploadFd(true); // [NOTE] already destroy mutex, then do not lock it.
return true;
}
void PseudoFdInfo::CloseUploadFd(bool lock_already_held)
{
AutoLock auto_lock(&upload_list_lock, lock_already_held ? AutoLock::ALREADY_LOCKED : AutoLock::NONE);
if(-1 != upload_fd){
close(upload_fd);
}
}
bool PseudoFdInfo::OpenUploadFd(bool lock_already_held)
{
AutoLock auto_lock(&upload_list_lock, lock_already_held ? AutoLock::ALREADY_LOCKED : AutoLock::NONE);
if(-1 != upload_fd){
// already initialized
return true;
}
if(-1 == physical_fd){
S3FS_PRN_ERR("physical_fd is not initialized yet.");
return false;
}
// duplicate fd
if(-1 == (upload_fd = dup(physical_fd)) || 0 != lseek(upload_fd, 0, SEEK_SET)){
S3FS_PRN_ERR("Could not duplicate physical file descriptor(errno=%d)", errno);
if(-1 != upload_fd){
close(upload_fd);
}
return false;
}
struct stat st;
if(-1 == fstat(upload_fd, &st)){
S3FS_PRN_ERR("Invalid file descriptor for uploading(errno=%d)", errno);
close(upload_fd);
return false;
}
return true;
}
@ -114,23 +245,23 @@ bool PseudoFdInfo::ClearUploadInfo(bool is_cancel_mp, bool lock_already_held)
if(is_cancel_mp){
// [TODO]
// If we have any uploaded parts, we should delete them here.
// We haven't implemented it yet, but it will be implemented in the future.
// (User can delete them in the utility mode of s3fs.)
// If processing for cancellation is required, it will be processed here. Not implemented yet.
//
S3FS_PRN_INFO("Implementation of cancellation process for multipart upload is awaited.");
S3FS_PRN_DBG("If processing for cancellation is required, it will be processed here.");
}
upload_id.erase();
upload_list.clear();
ClearUntreated(true);
instruct_count = 0;
completed_count = 0;
last_result = 0;
return true;
}
bool PseudoFdInfo::InitialUploadInfo(const std::string& id)
bool PseudoFdInfo::InitialUploadInfo(const std::string& id, bool lock_already_held)
{
AutoLock auto_lock(&upload_list_lock);
AutoLock auto_lock(&upload_list_lock, lock_already_held ? AutoLock::ALREADY_LOCKED : AutoLock::NONE);
if(!ClearUploadInfo(true, true)){
return false;
@ -225,13 +356,6 @@ bool PseudoFdInfo::ClearUntreated(off_t start, off_t size)
return untreated_list.ClearParts(start, size);
}
bool PseudoFdInfo::GetUntreated(off_t& start, off_t& size, off_t max_size, off_t min_size)
{
AutoLock auto_lock(&upload_list_lock);
return untreated_list.GetPart(start, size, max_size, min_size);
}
bool PseudoFdInfo::GetLastUntreated(off_t& start, off_t& size, off_t max_size, off_t min_size)
{
AutoLock auto_lock(&upload_list_lock);
@ -243,7 +367,684 @@ bool PseudoFdInfo::AddUntreated(off_t start, off_t size)
{
AutoLock auto_lock(&upload_list_lock);
return untreated_list.AddPart(start, size);
bool result = untreated_list.AddPart(start, size);
if(!result){
S3FS_PRN_DBG("Failed adding untreated area part.");
}else if(S3fsLog::IsS3fsLogDbg()){
untreated_list.Dump();
}
return result;
}
bool PseudoFdInfo::GetLastUpdateUntreatedPart(off_t& start, off_t& size)
{
// Get last untreated area
if(!untreated_list.GetLastUpdatePart(start, size)){
return false;
}
return true;
}
bool PseudoFdInfo::ReplaceLastUpdateUntreatedPart(off_t front_start, off_t front_size, off_t behind_start, off_t behind_size)
{
if(0 < front_size){
if(!untreated_list.ReplaceLastUpdatePart(front_start, front_size)){
return false;
}
}else{
if(!untreated_list.RemoveLastUpdatePart()){
return false;
}
}
if(0 < behind_size){
if(!untreated_list.AddPart(behind_start, behind_size)){
return false;
}
}
return true;
}
//
// Utility for sorting upload list
//
static bool filepart_partnum_compare(const filepart& src1, const filepart& src2)
{
return (src1.get_part_number() <= src2.get_part_number());
}
bool PseudoFdInfo::InsertUploadPart(off_t start, off_t size, int part_num, bool is_copy, etagpair** ppetag, bool lock_already_held)
{
//S3FS_PRN_DBG("[start=%lld][size=%lld][part_num=%d][is_copy=%s]", static_cast<long long int>(start), static_cast<long long int>(size), part_num, (is_copy ? "true" : "false"));
if(!IsUploading()){
S3FS_PRN_ERR("Multipart Upload has not started yet.");
return false;
}
if(start < 0 || size <= 0 || part_num < 0 || !ppetag){
S3FS_PRN_ERR("Parameters are wrong.");
return false;
}
AutoLock auto_lock(&upload_list_lock, lock_already_held ? AutoLock::ALREADY_LOCKED : AutoLock::NONE);
// insert new part
etag_entities.push_back(etagpair(NULL, part_num));
etagpair& etag_entity = etag_entities.back();
filepart newpart(false, physical_fd, start, size, is_copy, &etag_entity);
upload_list.push_back(newpart);
// sort by part number
upload_list.sort(filepart_partnum_compare);
// set etag pointer
*ppetag = &etag_entity;
return true;
}
// [NOTE]
// This method only launches the upload thread.
// Check the maximum number of threads before calling.
//
bool PseudoFdInfo::ParallelMultipartUpload(const char* path, const mp_part_list_t& mplist, bool is_copy)
{
//S3FS_PRN_DBG("[path=%s][mplist(%zu)]", SAFESTRPTR(path), mplist.size());
if(mplist.empty()){
// nothing to do
return true;
}
if(!OpenUploadFd(true)){
return false;
}
for(mp_part_list_t::const_iterator iter = mplist.begin(); iter != mplist.end(); ++iter){
// Insert upload part
etagpair* petag = NULL;
if(!InsertUploadPart(iter->start, iter->size, iter->part_num, is_copy, &petag, true)){
S3FS_PRN_ERR("Failed to insert insert upload part(path=%s, start=%lld, size=%lld, part=%d, copy=%s) to mplist", SAFESTRPTR(path), static_cast<long long int>(iter->start), static_cast<long long int>(iter->size), iter->part_num, (is_copy ? "true" : "false"));
return false;
}
// make parameter for my thread
pseudofdinfo_thparam* thargs = new pseudofdinfo_thparam;
thargs->ppseudofdinfo = this;
thargs->path = SAFESTRPTR(path);
thargs->upload_id = upload_id;
thargs->upload_fd = upload_fd;
thargs->start = iter->start;
thargs->size = iter->size;
thargs->is_copy = is_copy;
thargs->part_num = iter->part_num;
thargs->petag = petag;
// make parameter for thread pool
thpoolman_param* ppoolparam = new thpoolman_param;
ppoolparam->args = thargs;
ppoolparam->psem = &uploaded_sem;
ppoolparam->pfunc = PseudoFdInfo::MultipartUploadThreadWorker;
// setup instruction
if(!ThreadPoolMan::Instruct(ppoolparam)){
S3FS_PRN_ERR("failed setup instruction for uploading.");
delete ppoolparam;
delete thargs;
return false;
}
++instruct_count;
}
return true;
}
bool PseudoFdInfo::ParallelMultipartUploadAll(const char* path, const mp_part_list_t& upload_list, const mp_part_list_t& copy_list, int& result)
{
S3FS_PRN_DBG("[path=%s][upload_list(%zu)][copy_list(%zu)]", SAFESTRPTR(path), upload_list.size(), copy_list.size());
result = 0;
if(!OpenUploadFd(true)){
return false;
}
if(!ParallelMultipartUpload(path, upload_list, false) || !ParallelMultipartUpload(path, copy_list, true)){
S3FS_PRN_ERR("Failed setup instruction for uploading(path=%s, upload_list=%zu, copy_list=%zu).", SAFESTRPTR(path), upload_list.size(), copy_list.size());
return false;
}
// Wait for all thread exiting
result = WaitAllThreadsExit();
return true;
}
//
// Upload the last updated Untreated area
//
// [Overview]
// Uploads untreated areas with the maximum multipart upload size as the
// boundary.
//
// * The starting position of the untreated area is aligned with the maximum
// multipart upload size as the boundary.
// * If there is an uploaded area that overlaps with the aligned untreated
// area, that uploaded area is canceled and absorbed by the untreated area.
// * Upload only when the aligned untreated area exceeds the maximum multipart
// upload size.
// * When the start position of the untreated area is changed to boundary
// alignment(to backward), and if that gap area is remained, that area is
// rest to untreated area.
//
ssize_t PseudoFdInfo::UploadBoundaryLastUntreatedArea(const char* path, headers_t& meta)
{
S3FS_PRN_DBG("[path=%s][pseudo_fd=%d][physical_fd=%d]", SAFESTRPTR(path), pseudo_fd, physical_fd);
if(!path || -1 == physical_fd || -1 == pseudo_fd){
S3FS_PRN_ERR("pseudo_fd(%d) to physical_fd(%d) for path(%s) is not opened or not writable", pseudo_fd, physical_fd, path);
return -EBADF;
}
AutoLock auto_lock(&upload_list_lock);
//
// Get last update untreated area
//
off_t last_untreated_start = 0;
off_t last_untreated_size = 0;
if(!GetLastUpdateUntreatedPart(last_untreated_start, last_untreated_size) || last_untreated_start < 0 || last_untreated_size <= 0){
S3FS_PRN_WARN("Not found last update untreated area or it is empty, thus return without any error.");
return 0;
}
//
// Aligns the start position of the last updated raw area with the boundary
//
// * Align the last updated raw space with the maximum upload size boundary.
// * The remaining size of the part before the boundary is will not be uploaded.
//
off_t max_mp_size = S3fsCurl::GetMultipartSize();
off_t aligned_start = ((last_untreated_start / max_mp_size) + (0 < (last_untreated_start % max_mp_size) ? 1 : 0)) * max_mp_size;
if((last_untreated_start + last_untreated_size) <= aligned_start){
S3FS_PRN_INFO("After the untreated area(start=%lld, size=%lld) is aligned with the boundary, the aligned start(%lld) exceeds the untreated area, so there is nothing to do.", static_cast<long long int>(last_untreated_start), static_cast<long long int>(last_untreated_size), static_cast<long long int>(aligned_start));
return 0;
}
off_t aligned_size = (((last_untreated_start + last_untreated_size) - aligned_start) / max_mp_size) * max_mp_size;
if(0 == aligned_size){
S3FS_PRN_DBG("After the untreated area(start=%lld, size=%lld) is aligned with the boundary(start is %lld), the aligned size is empty, so nothing to do.", static_cast<long long int>(last_untreated_start), static_cast<long long int>(last_untreated_size), static_cast<long long int>(aligned_start));
return 0;
}
off_t front_rem_start = last_untreated_start; // start of the remainder untreated area in front of the boundary
off_t front_rem_size = aligned_start - last_untreated_start; // size of the remainder untreated area in front of the boundary
//
// Get the area for uploading, if last update treated area can be uploaded.
//
// [NOTE]
// * Create the updoad area list, if the untreated area aligned with the boundary
// exceeds the maximum upload size.
// * If it overlaps with an area that has already been uploaded(unloaded list),
// that area is added to the cancellation list and included in the untreated area.
//
mp_part_list_t to_upload_list;
filepart_list_t cancel_uploaded_list;
if(!ExtractUploadPartsFromUntreatedArea(aligned_start, aligned_size, to_upload_list, cancel_uploaded_list, S3fsCurl::GetMultipartSize())){
S3FS_PRN_ERR("Failed to extract upload parts from last untreated area.");
return -EIO;
}
if(to_upload_list.empty()){
S3FS_PRN_INFO("There is nothing to upload. In most cases, the untreated area does not meet the upload size.");
return 0;
}
//
// Has multipart uploading already started?
//
if(!IsUploading()){
// Multipart uploading hasn't started yet, so start it.
//
S3fsCurl s3fscurl(true);
std::string upload_id;
int result;
if(0 != (result = s3fscurl.PreMultipartPostRequest(path, meta, upload_id, true))){
S3FS_PRN_ERR("failed to setup multipart upload(create upload id) by errno(%d)", result);
return result;
}
if(!InitialUploadInfo(upload_id, true)){
S3FS_PRN_ERR("failed to setup multipart upload(set upload id to object)");
return result;
}
}
//
// Output debug level information
//
// When canceling(overwriting) a part that has already been uploaded, output it.
//
if(S3fsLog::IsS3fsLogDbg()){
for(filepart_list_t::const_iterator cancel_iter = cancel_uploaded_list.begin(); cancel_iter != cancel_uploaded_list.end(); ++cancel_iter){
S3FS_PRN_DBG("Cancel uploaded: start(%lld), size(%lld), part number(%d)", static_cast<long long int>(cancel_iter->startpos), static_cast<long long int>(cancel_iter->size), (cancel_iter->petag ? cancel_iter->petag->part_num : -1));
}
}
//
// Upload Multipart parts
//
if(!ParallelMultipartUpload(path, to_upload_list, false)){
S3FS_PRN_ERR("Failed to upload multipart parts.");
return -EIO;
}
//
// Exclude the uploaded Untreated area and update the last Untreated area.
//
off_t behind_rem_start = aligned_start + aligned_size;
off_t behind_rem_size = (last_untreated_start + last_untreated_size) - behind_rem_start;
if(!ReplaceLastUpdateUntreatedPart(front_rem_start, front_rem_size, behind_rem_start, behind_rem_size)){
S3FS_PRN_WARN("The last untreated area could not be detected and the uploaded area could not be excluded from it, but continue because it does not affect the overall processing.");
}
return 0;
}
int PseudoFdInfo::WaitAllThreadsExit()
{
while(true){
{
AutoLock auto_lock(&upload_list_lock);
if(0 == instruct_count && 0 == completed_count){
break;
}
while(uploaded_sem.try_wait()){
if(0 < completed_count){
--completed_count;
}
}
if(0 == instruct_count && 0 == completed_count){
break;
}
}
// need to wait the worker exiting
uploaded_sem.wait();
{
AutoLock auto_lock(&upload_list_lock);
if(0 < completed_count){
--completed_count;
}
}
}
return last_result;
}
//
// Extract the list for multipart upload from the Unteated Area
//
// The untreated_start parameter must be set aligning it with the boundaries
// of the maximum multipart upload size. This method expects it to be bounded.
//
// This method creates the upload area aligned from the untreated area by
// maximum size and creates the required list.
// If it overlaps with an area that has already been uploaded, the overlapped
// upload area will be canceled and absorbed by the untreated area.
// If the list creation process is complete and areas smaller than the maximum
// size remain, those area will be reset to untreated_start and untreated_size
// and returned to the caller.
// If the called untreated area is smaller than the maximum size of the
// multipart upload, no list will be created.
//
// [NOTE]
// Maximum multipart upload size must be uploading boundary.
//
bool PseudoFdInfo::ExtractUploadPartsFromUntreatedArea(off_t& untreated_start, off_t& untreated_size, mp_part_list_t& to_upload_list, filepart_list_t& cancel_upload_list, off_t max_mp_size)
{
if(untreated_start < 0 || untreated_size <= 0){
S3FS_PRN_ERR("Paramters are wrong(untreated_start=%lld, untreated_size=%lld).", static_cast<long long int>(untreated_start), static_cast<long long int>(untreated_size));
return false;
}
// Initiliaze lists
to_upload_list.clear();
cancel_upload_list.clear();
//
// Align start position with maximum multipart upload boundaries
//
off_t aligned_start = (untreated_start / max_mp_size) * max_mp_size;
off_t aligned_size = untreated_size + (untreated_start - aligned_start);
//
// Check aligned untreated size
//
if(aligned_size < max_mp_size){
S3FS_PRN_INFO("untreated area(start=%lld, size=%lld) to aligned boundary(start=%lld, size=%lld) is smaller than max mp size(%lld), so nothing to do.", static_cast<long long int>(untreated_start), static_cast<long long int>(untreated_size), static_cast<long long int>(aligned_start), static_cast<long long int>(aligned_size), static_cast<long long int>(max_mp_size));
return true; // successful termination
}
//
// Check each unloaded area in list
//
// [NOTE]
// The uploaded area must be to be aligned by boundary.
// Also, it is assumed that it must not be a copy area.
// So if the areas overlap, include uploaded area as an untreated area.
//
for(filepart_list_t::iterator cur_iter = upload_list.begin(); cur_iter != upload_list.end(); /* ++cur_iter */){
// Check overlap
if((cur_iter->startpos + cur_iter->size - 1) < aligned_start || (aligned_start + aligned_size - 1) < cur_iter->startpos){
// Areas do not overlap
++cur_iter;
}else{
// The areas overlap
//
// Since the start position of the uploaded area is aligned with the boundary,
// it is not necessary to check the start position.
// If the uploaded area exceeds the untreated area, expand the untreated area.
//
if((aligned_start + aligned_size - 1) < (cur_iter->startpos + cur_iter->size - 1)){
aligned_size += (cur_iter->startpos + cur_iter->size) - (aligned_start + aligned_size);
}
//
// Add this to cancel list
//
cancel_upload_list.push_back(*cur_iter); // Copy and Push to cancel list
cur_iter = upload_list.erase(cur_iter);
}
}
//
// Add upload area to the list
//
while(max_mp_size <= aligned_size){
int part_num = (aligned_start / max_mp_size) + 1;
to_upload_list.push_back(mp_part(aligned_start, max_mp_size, part_num));
aligned_start += max_mp_size;
aligned_size -= max_mp_size;
}
return true;
}
//
// Extract the area lists to be uploaded/downloaded for the entire file.
//
// [Parameters]
// to_upload_list : A list of areas to upload in multipart upload.
// to_copy_list : A list of areas for copy upload in multipart upload.
// to_download_list : A list of areas that must be downloaded before multipart upload.
// cancel_upload_list : A list of areas that have already been uploaded and will be canceled(overwritten).
// file_size : The size of the upload file.
// use_copy : Specify true if copy multipart upload is available.
//
// [NOTE]
// The untreated_list does not change, but upload_list is changed.
// (If you want to restore it, you can use cancel_upload_list.)
//
bool PseudoFdInfo::ExtractUploadPartsFromAllArea(mp_part_list_t& to_upload_list, mp_part_list_t& to_copy_list, mp_part_list_t& to_download_list, filepart_list_t& cancel_upload_list, off_t max_mp_size, off_t file_size, bool use_copy)
{
AutoLock auto_lock(&upload_list_lock);
// Initiliaze lists
to_upload_list.clear();
to_copy_list.clear();
to_download_list.clear();
cancel_upload_list.clear();
// Duplicate untreated list
untreated_list_t dup_untreated_list;
untreated_list.Duplicate(dup_untreated_list);
// Initialize the iterator of each list first
untreated_list_t::iterator dup_untreated_iter = dup_untreated_list.begin();
filepart_list_t::iterator uploaded_iter = upload_list.begin();
//
// Loop to extract areas to upload and download
//
// Check at the boundary of the maximum upload size from the beginning of the file
//
for(off_t cur_start = 0, cur_size = 0; cur_start < file_size; cur_start += cur_size){
//
// Set part size
// (To avoid confusion, the area to be checked is called the "current area".)
//
cur_size = ((cur_start + max_mp_size) <= file_size ? max_mp_size : (file_size - cur_start));
//
// Extract the untreated erea that overlaps this current area.
// (The extracted area is deleted from dup_untreated_list.)
//
untreated_list_t cur_untreated_list;
for(cur_untreated_list.clear(); dup_untreated_iter != dup_untreated_list.end(); ){
if((dup_untreated_iter->start < (cur_start + cur_size)) && (cur_start < (dup_untreated_iter->start + dup_untreated_iter->size))){
// this untreated area is overlap
off_t tmp_untreated_start;
off_t tmp_untreated_size;
if(dup_untreated_iter->start < cur_start){
// [NOTE]
// This untreated area overlaps with the current area, but starts
// in front of the target area.
// This state should not be possible, but if this state is detected,
// the part before the target area will be deleted.
//
tmp_untreated_start = cur_start;
tmp_untreated_size = dup_untreated_iter->size - (cur_start - dup_untreated_iter->start);
}else{
tmp_untreated_start = dup_untreated_iter->start;
tmp_untreated_size = dup_untreated_iter->size;
}
//
// Check the end of the overlapping untreated area.
//
if((tmp_untreated_start + tmp_untreated_size) <= (cur_start + cur_size)){
//
// All of untreated areas are within the current area
//
// - Add this untreated area to cur_untreated_list
// - Delete this from dup_untreated_list
//
cur_untreated_list.push_back(untreatedpart(tmp_untreated_start, tmp_untreated_size));
dup_untreated_iter = dup_untreated_list.erase(dup_untreated_iter);
}else{
//
// The untreated area exceeds the end of the current area
//
// Ajust untreated area
tmp_untreated_size = (cur_start + cur_size) - tmp_untreated_start;
// Add ajusted untreated area to cur_untreated_list
cur_untreated_list.push_back(untreatedpart(tmp_untreated_start, tmp_untreated_size));
// Remove this ajusted untreated area from the area pointed
// to by dup_untreated_iter.
dup_untreated_iter->size = (dup_untreated_iter->start + dup_untreated_iter->size) - (cur_start + cur_size);
dup_untreated_iter->start = tmp_untreated_start + tmp_untreated_size;
}
}else if((cur_start + cur_size - 1) < dup_untreated_iter->start){
// this untreated area is over the current area, thus break loop.
break;
}else{
++dup_untreated_iter;
}
}
//
// Check uploaded area
//
// [NOTE]
// The uploaded area should be aligned with the maximum upload size boundary.
// It also assumes that each size of uploaded area must be a maximum upload
// size.
//
filepart_list_t::iterator overlap_uploaded_iter = upload_list.end();
for(; uploaded_iter != upload_list.end(); ++uploaded_iter){
if((cur_start < (uploaded_iter->startpos + uploaded_iter->size)) && (uploaded_iter->startpos < (cur_start + cur_size))){
if(overlap_uploaded_iter != upload_list.end()){
//
// Something wrong in this unloaded area.
//
// This area is not aligned with the boundary, then this condition
// is unrecoverable and return failure.
//
S3FS_PRN_ERR("The uploaded list may not be the boundary for the maximum multipart upload size. No further processing is possible.");
return false;
}
// Set this iterator to ovrelap iter
overlap_uploaded_iter = uploaded_iter;
}else if((cur_start + cur_size - 1) < uploaded_iter->startpos){
break;
}
}
//
// Create upload/download/cancel/copy list for this current area
//
int part_num = (cur_start / max_mp_size) + 1;
if(cur_untreated_list.empty()){
//
// No untreated area was detected in this current area
//
if(overlap_uploaded_iter != upload_list.end()){
//
// This current area already uploaded, then nothing to add to lists.
//
S3FS_PRN_DBG("Already uploaded: start=%lld, size=%lld", static_cast<long long int>(cur_start), static_cast<long long int>(cur_size));
}else{
//
// This current area has not been uploaded
// (neither an uploaded area nor an untreated area.)
//
if(use_copy){
//
// Copy multipart upload available
//
S3FS_PRN_DBG("To copy: start=%lld, size=%lld", static_cast<long long int>(cur_start), static_cast<long long int>(cur_size));
to_copy_list.push_back(mp_part(cur_start, cur_size, part_num));
}else{
//
// This current area needs to be downloaded and uploaded
//
S3FS_PRN_DBG("To download and upload: start=%lld, size=%lld", static_cast<long long int>(cur_start), static_cast<long long int>(cur_size));
to_download_list.push_back(mp_part(cur_start, cur_size));
to_upload_list.push_back(mp_part(cur_start, cur_size, part_num));
}
}
}else{
//
// Found untreated area in this current area
//
if(overlap_uploaded_iter != upload_list.end()){
//
// This current area is also the uploaded area
//
// [NOTE]
// The uploaded area is aligned with boundary, there are all data in
// this current area locally(which includes all data of untreated area).
// So this current area only needs to be uploaded again.
//
S3FS_PRN_DBG("Cancel upload: start=%lld, size=%lld", static_cast<long long int>(overlap_uploaded_iter->startpos), static_cast<long long int>(overlap_uploaded_iter->size));
cancel_upload_list.push_back(*overlap_uploaded_iter); // add this uploaded area to cancel_upload_list
upload_list.erase(overlap_uploaded_iter); // remove it from upload_list
S3FS_PRN_DBG("To upload: start=%lld, size=%lld", static_cast<long long int>(cur_start), static_cast<long long int>(cur_size));
to_upload_list.push_back(mp_part(cur_start, cur_size, part_num)); // add new uploading area to list
}else{
//
// No uploaded area overlap this current area
// (Areas other than the untreated area must be downloaded.)
//
// [NOTE]
// Need to consider the case where there is a gap between the start
// of the current area and the untreated area.
// This gap is the area that should normally be downloaded.
// But it is the area that can be copied if we can use copy multipart
// upload. Then If we can use copy multipart upload and the previous
// area is used copy multipart upload, this gap will be absorbed by
// the previous area.
// Unifying the copy multipart upload area can reduce the number of
// upload requests.
//
off_t tmp_cur_start = cur_start;
off_t tmp_cur_size = cur_size;
off_t changed_start = cur_start;
off_t changed_size = cur_size;
bool first_area = true;
for(untreated_list_t::const_iterator tmp_cur_untreated_iter = cur_untreated_list.begin(); tmp_cur_untreated_iter != cur_untreated_list.end(); ++tmp_cur_untreated_iter, first_area = false){
if(tmp_cur_start < tmp_cur_untreated_iter->start){
//
// Detected a gap at the start of area
//
bool include_prev_copy_part = false;
if(first_area && use_copy && !to_copy_list.empty()){
//
// Make sure that the area of the last item in to_copy_list
// is contiguous with this current area.
//
// [NOTE]
// Areas can be unified if the total size of the areas is
// within 5GB and the remaining area after unification is
// larger than the minimum multipart upload size.
//
mp_part_list_t::reverse_iterator copy_riter = to_copy_list.rbegin();
if( (copy_riter->start + copy_riter->size) == tmp_cur_start &&
(copy_riter->size + (tmp_cur_untreated_iter->start - tmp_cur_start)) <= FIVE_GB &&
((tmp_cur_start + tmp_cur_size) - (tmp_cur_untreated_iter->start - tmp_cur_start)) >= MIN_MULTIPART_SIZE )
{
//
// Unify to this area to previouse copy area.
//
copy_riter->size += tmp_cur_untreated_iter->start - tmp_cur_start;
S3FS_PRN_DBG("Resize to copy: start=%lld, size=%lld", static_cast<long long int>(copy_riter->start), static_cast<long long int>(copy_riter->size));
changed_size -= (tmp_cur_untreated_iter->start - changed_start);
changed_start = tmp_cur_untreated_iter->start;
include_prev_copy_part = true;
}
}
if(!include_prev_copy_part){
//
// If this area is not unified, need to download this area
//
S3FS_PRN_DBG("To download: start=%lld, size=%lld", static_cast<long long int>(tmp_cur_start), static_cast<long long int>(tmp_cur_untreated_iter->start - tmp_cur_start));
to_download_list.push_back(mp_part(tmp_cur_start, tmp_cur_untreated_iter->start - tmp_cur_start));
}
}
//
// Set next start position
//
tmp_cur_size = (tmp_cur_start + tmp_cur_size) - (tmp_cur_untreated_iter->start + tmp_cur_untreated_iter->size);
tmp_cur_start = tmp_cur_untreated_iter->start + tmp_cur_untreated_iter->size;
}
//
// Add download area to list, if remaining size
//
if(0 < tmp_cur_size){
S3FS_PRN_DBG("To download: start=%lld, size=%lld", static_cast<long long int>(tmp_cur_start), static_cast<long long int>(tmp_cur_size));
to_download_list.push_back(mp_part(tmp_cur_start, tmp_cur_size));
}
//
// Set upload area(whole of area) to list
//
S3FS_PRN_DBG("To upload: start=%lld, size=%lld", static_cast<long long int>(changed_start), static_cast<long long int>(changed_size));
to_upload_list.push_back(mp_part(changed_start, changed_size, part_num));
}
}
}
return true;
}
/*

View File

@ -22,6 +22,28 @@
#define S3FS_FDCACHE_FDINFO_H_
#include "fdcache_untreated.h"
#include "psemaphore.h"
#include "metaheader.h"
//------------------------------------------------
// Structure of parameters to pass to thread
//------------------------------------------------
class PseudoFdInfo;
struct pseudofdinfo_thparam
{
PseudoFdInfo* ppseudofdinfo;
std::string path;
std::string upload_id;
int upload_fd;
off_t start;
off_t size;
bool is_copy;
int part_num;
etagpair* petag;
pseudofdinfo_thparam() : ppseudofdinfo(NULL), path(""), upload_id(""), upload_fd(-1), start(0), size(0), is_copy(false), part_num(-1), petag(NULL) {}
};
//------------------------------------------------
// Class PseudoFdInfo
@ -29,19 +51,36 @@
class PseudoFdInfo
{
private:
int pseudo_fd;
int physical_fd;
int flags; // flags at open
std::string upload_id;
filepart_list_t upload_list;
UntreatedParts untreated_list; // list of untreated parts that have been written and not yet uploaded(for streamupload)
etaglist_t etag_entities; // list of etag string and part number entities(to maintain the etag entity even if MPPART_INFO is destroyed)
static int max_threads;
static int opt_max_threads; // for option value
bool is_lock_init;
pthread_mutex_t upload_list_lock; // protects upload_id and upload_list
int pseudo_fd;
int physical_fd;
int flags; // flags at open
std::string upload_id;
int upload_fd; // duplicated fd for uploading
filepart_list_t upload_list;
UntreatedParts untreated_list; // list of untreated parts that have been written and not yet uploaded(for streamupload)
etaglist_t etag_entities; // list of etag string and part number entities(to maintain the etag entity even if MPPART_INFO is destroyed)
bool is_lock_init;
pthread_mutex_t upload_list_lock; // protects upload_id and upload_list
Semaphore uploaded_sem; // use a semaphore to trigger an upload completion like event flag
volatile int instruct_count; // number of instructions for processing by threads
volatile int completed_count; // number of completed processes by thread
int last_result; // the result of thread processing
private:
static void* MultipartUploadThreadWorker(void* arg);
bool Clear();
void CloseUploadFd(bool lock_already_held = false);
bool OpenUploadFd(bool lock_already_held = false);
bool GetLastUpdateUntreatedPart(off_t& start, off_t& size);
bool ReplaceLastUpdateUntreatedPart(off_t front_start, off_t front_size, off_t behind_start, off_t behind_size);
bool ParallelMultipartUpload(const char* path, const mp_part_list_t& mplist, bool is_copy);
bool InsertUploadPart(off_t start, off_t size, int part_num, bool is_copy, etagpair** ppetag, bool lock_already_held = false);
int WaitAllThreadsExit();
bool ExtractUploadPartsFromUntreatedArea(off_t& untreated_start, off_t& untreated_size, mp_part_list_t& to_upload_list, filepart_list_t& cancel_upload_list, off_t max_mp_size);
public:
PseudoFdInfo(int fd = -1, int open_flags = 0);
@ -55,7 +94,7 @@ class PseudoFdInfo
bool Set(int fd, int open_flags);
bool ClearUploadInfo(bool is_clear_part = false, bool lock_already_held = false);
bool InitialUploadInfo(const std::string& id);
bool InitialUploadInfo(const std::string& id, bool lock_already_held = false);
bool IsUploading() const { return !upload_id.empty(); }
bool GetUploadId(std::string& id) const;
@ -65,9 +104,12 @@ class PseudoFdInfo
void ClearUntreated(bool lock_already_held = false);
bool ClearUntreated(off_t start, off_t size);
bool GetUntreated(off_t& start, off_t& size, off_t max_size, off_t min_size = MIN_MULTIPART_SIZE);
bool GetLastUntreated(off_t& start, off_t& size, off_t max_size, off_t min_size = MIN_MULTIPART_SIZE);
bool AddUntreated(off_t start, off_t size);
bool ParallelMultipartUploadAll(const char* path, const mp_part_list_t& upload_list, const mp_part_list_t& copy_list, int& result);
ssize_t UploadBoundaryLastUntreatedArea(const char* path, headers_t& meta);
bool ExtractUploadPartsFromAllArea(mp_part_list_t& to_upload_list, mp_part_list_t& to_copy_list, mp_part_list_t& to_download_list, filepart_list_t& cancel_upload_list, off_t max_mp_size, off_t file_size, bool use_copy);
};
typedef std::map<int, class PseudoFdInfo*> fdinfo_map_t;

View File

@ -91,8 +91,9 @@ bool UntreatedParts::AddPart(off_t start, off_t size)
return true;
}else if((start + size) < iter->start){
// The part to add should be inserted before the current part.
// The part to add should be inserted before the current part.
untreated_list.insert(iter, untreatedpart(start, size, last_tag));
// success to stretch and compress existed parts
return true;
}
}
@ -133,90 +134,6 @@ bool UntreatedParts::RowGetPart(off_t& start, off_t& size, off_t max_size, off_t
return false;
}
// [NOTE]
// The part with the last tag cannot be taken out if it has not reached max_size.
//
bool UntreatedParts::TakeoutPart(off_t& start, off_t& size, off_t max_size, off_t min_size)
{
if(max_size <= 0 || min_size < 0 || max_size < min_size){
S3FS_PRN_ERR("Paramter are wrong(max_size=%lld, min_size=%lld).", static_cast<long long int>(max_size), static_cast<long long int>(min_size));
return false;
}
AutoLock auto_lock(&untreated_list_lock);
// Check the overlap with the existing part and add the part.
for(untreated_list_t::iterator iter = untreated_list.begin(); iter != untreated_list.end(); ++iter){
if(iter->untreated_tag == last_tag){
// Last updated part
if(max_size <= iter->size){
// Take out only when the maximum part size is exceeded
start = iter->start;
size = max_size;
iter->start = iter->start + max_size;
iter->size = iter->size - max_size;
if(iter->size == 0){
untreated_list.erase(iter);
}
return true;
}
}else{
// Parts updated in the past
if(min_size <= iter->size){
if(iter->size <= max_size){
// Take out the whole part( min <= part size <= max )
start = iter->start;
size = iter->size;
untreated_list.erase(iter);
}else{
// Partially take out part( max < part size )
start = iter->start;
size = max_size;
iter->start = iter->start + max_size;
iter->size = iter->size - max_size;
}
return true;
}
}
}
return false;
}
// [NOTE]
// This method returns the part from the beginning, ignoring conditions
// such as whether it is being updated(the last updated part) or less
// than the minimum size.
//
bool UntreatedParts::TakeoutPartFromBegin(off_t& start, off_t& size, off_t max_size)
{
if(max_size <= 0){
S3FS_PRN_ERR("Paramter is wrong(max_size=%lld).", static_cast<long long int>(max_size));
return false;
}
AutoLock auto_lock(&untreated_list_lock);
if(untreated_list.empty()){
return false;
}
untreated_list_t::iterator iter = untreated_list.begin();
if(iter->size <= max_size){
// Take out the whole part( part size <= max )
start = iter->start;
size = iter->size;
untreated_list.erase(iter);
}else{
// Take out only when the maximum part size is exceeded
start = iter->start;
size = max_size;
iter->start = iter->start + max_size;
iter->size = iter->size - max_size;
}
return true;
}
// [NOTE]
// If size is specified as 0, all areas(parts) after start will be deleted.
//
@ -251,7 +168,7 @@ bool UntreatedParts::ClearParts(off_t start, off_t size)
}
}else if(start < (iter->start + iter->size)){
// clear area overlaps with iter area(on the end side)
if(0 == size || (iter->start + iter->size) <= (start + size) ){
if(0 == size || (iter->start + iter->size) <= (start + size)){
// start to iter->end is clear
iter->size = start - iter->start;
}else{
@ -274,6 +191,85 @@ bool UntreatedParts::ClearParts(off_t start, off_t size)
return true;
}
//
// Update the last updated Untreated part
//
bool UntreatedParts::GetLastUpdatePart(off_t& start, off_t& size)
{
AutoLock auto_lock(&untreated_list_lock);
for(untreated_list_t::const_iterator iter = untreated_list.begin(); iter != untreated_list.end(); ++iter){
if(iter->untreated_tag == last_tag){
start = iter->start;
size = iter->size;
return true;
}
}
return false;
}
//
// Replaces the last updated Untreated part.
//
// [NOTE]
// If size <= 0, delete that part
//
bool UntreatedParts::ReplaceLastUpdatePart(off_t start, off_t size)
{
AutoLock auto_lock(&untreated_list_lock);
for(untreated_list_t::iterator iter = untreated_list.begin(); iter != untreated_list.end(); ++iter){
if(iter->untreated_tag == last_tag){
if(0 < size){
iter->start = start;
iter->size = size;
}else{
untreated_list.erase(iter);
}
return true;
}
}
return false;
}
//
// Remove the last updated Untreated part.
//
bool UntreatedParts::RemoveLastUpdatePart()
{
AutoLock auto_lock(&untreated_list_lock);
for(untreated_list_t::iterator iter = untreated_list.begin(); iter != untreated_list.end(); ++iter){
if(iter->untreated_tag == last_tag){
untreated_list.erase(iter);
return true;
}
}
return false;
}
//
// Duplicate the internally untreated_list.
//
bool UntreatedParts::Duplicate(untreated_list_t& list)
{
AutoLock auto_lock(&untreated_list_lock);
list = untreated_list;
return true;
}
void UntreatedParts::Dump()
{
AutoLock auto_lock(&untreated_list_lock);
S3FS_PRN_DBG("untreated list = [");
for(untreated_list_t::const_iterator iter = untreated_list.begin(); iter != untreated_list.end(); ++iter){
S3FS_PRN_DBG(" {%014lld - %014lld : tag=%ld}", static_cast<long long int>(iter->start), static_cast<long long int>(iter->size), iter->untreated_tag);
}
S3FS_PRN_DBG("]");
}
/*
* Local variables:
* tab-width: 4

View File

@ -46,19 +46,18 @@ class UntreatedParts
bool empty();
bool AddPart(off_t start, off_t size);
// [NOTE]
// The following method does not return parts smaller than mini_size.
// You can avoid it by setting min_size to 0.
//
bool GetPart(off_t& start, off_t& size, off_t max_size, off_t min_size = MIN_MULTIPART_SIZE) { return RowGetPart(start, size, max_size, min_size, false); }
bool GetLastUpdatedPart(off_t& start, off_t& size, off_t max_size, off_t min_size = MIN_MULTIPART_SIZE) { return RowGetPart(start, size, max_size, min_size, true); }
bool TakeoutPart(off_t& start, off_t& size, off_t max_size, off_t min_size = MIN_MULTIPART_SIZE);
bool TakeoutPartFromBegin(off_t& start, off_t& size, off_t max_size);
bool ClearParts(off_t start, off_t size);
bool ClearAll() { return ClearParts(0, 0); }
bool GetLastUpdatePart(off_t& start, off_t& size);
bool ReplaceLastUpdatePart(off_t start, off_t size);
bool RemoveLastUpdatePart();
bool Duplicate(untreated_list_t& list);
void Dump();
};
#endif // S3FS_FDCACHE_UNTREATED_H_

View File

@ -42,6 +42,14 @@ class Semaphore
dispatch_release(sem);
}
void wait() { dispatch_semaphore_wait(sem, DISPATCH_TIME_FOREVER); }
bool try_wait()
{
if(0 == dispatch_semaphore_wait(sem, DISPATCH_TIME_NOW)){
return true;
}else{
return false;
}
}
void post() { dispatch_semaphore_signal(sem); }
int get_value() const { return value; }
@ -67,6 +75,15 @@ class Semaphore
r = sem_wait(&mutex);
} while (r == -1 && errno == EINTR);
}
bool try_wait()
{
int result;
do{
result = sem_trywait(&mutex);
}while(result == -1 && errno == EINTR);
return (0 == result);
}
void post() { sem_post(&mutex); }
int get_value() const { return value; }

View File

@ -47,6 +47,7 @@
#include "s3fs_help.h"
#include "s3fs_util.h"
#include "mpu_util.h"
#include "threadpoolman.h"
//-------------------------------------------------------------------
// Symbols
@ -95,6 +96,7 @@ static int max_keys_list_object = 1000;// default is 1000
static off_t max_dirty_data = 5LL * 1024LL * 1024LL * 1024LL;
static bool use_wtf8 = false;
static off_t fake_diskfree_size = -1; // default is not set(-1)
static int max_thread_count = 5; // default is 5
//-------------------------------------------------------------------
// Global functions : prototype
@ -4084,6 +4086,16 @@ static int my_fuse_opt_proc(void* data, const char* arg, int key, struct fuse_ar
S3fsCurl::SetMaxParallelCount(maxpara);
return 0;
}
if(is_prefix(arg, "max_thread_count=")){
int max_thcount = static_cast<int>(cvt_strtoofft(strchr(arg, '=') + sizeof(char), /*base=*/ 10));
if(0 >= max_thcount){
S3FS_PRN_EXIT("argument should be over 1: max_thread_count");
return -1;
}
max_thread_count = max_thcount;
S3FS_PRN_WARN("The max_thread_count option is not a formal option. Please note that it will change in the future.");
return 0;
}
if(is_prefix(arg, "fd_page_size=")){
S3FS_PRN_ERR("option fd_page_size is no longer supported, so skip this option.");
return 0;
@ -4164,6 +4176,11 @@ static int my_fuse_opt_proc(void* data, const char* arg, int key, struct fuse_ar
nocopyapi = true;
return 0;
}
if(0 == strcmp(arg, "streamupload")){
FdEntity::SetStreamUpload(true);
S3FS_PRN_WARN("The streamupload option is not a formal option. Please note that it will change in the future.");
return 0;
}
if(0 == strcmp(arg, "norenameapi")){
norenameapi = true;
return 0;
@ -4672,6 +4689,14 @@ int main(int argc, char* argv[])
max_dirty_data = -1;
}
if(!ThreadPoolMan::Initialize(max_thread_count)){
S3FS_PRN_EXIT("Could not create thread pool(%d)", max_thread_count);
S3fsCurl::DestroyS3fsCurl();
s3fs_destroy_global_ssl();
destroy_parser_xml_lock();
exit(EXIT_FAILURE);
}
// check free disk space
if(!FdManager::IsSafeDiskSpace(NULL, S3fsCurl::GetMultipartSize() * S3fsCurl::GetMaxParallelCount())){
S3FS_PRN_EXIT("There is no enough disk space for used as cache(or temporary) directory by s3fs.");
@ -4729,6 +4754,9 @@ int main(int argc, char* argv[])
}
fuse_opt_free_args(&custom_args);
// Destroy thread pool
ThreadPoolMan::Destroy();
// Destroy curl
if(!S3fsCurl::DestroyS3fsCurl()){
S3FS_PRN_WARN("Could not release curl library.");

261
src/threadpoolman.cpp Normal file
View File

@ -0,0 +1,261 @@
/*
* s3fs - FUSE-based file system backed by Amazon S3
*
* Copyright(C) 2007 Takeshi Nakatani <ggtakec.com>
*
* This program is free software; you can redistribute it and/or
* modify it under the terms of the GNU General Public License
* as published by the Free Software Foundation; either version 2
* of the License, or (at your option) any later version.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU General Public License for more details.
*
* You should have received a copy of the GNU General Public License
* along with this program; if not, write to the Free Software
* Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA.
*/
#include <cstdio>
#include <cstdlib>
#include <algorithm>
#include <errno.h>
#include "common.h"
#include "s3fs.h"
#include "threadpoolman.h"
#include "autolock.h"
//------------------------------------------------
// ThreadPoolMan class variables
//------------------------------------------------
ThreadPoolMan* ThreadPoolMan::singleton = NULL;
//------------------------------------------------
// ThreadPoolMan class methods
//------------------------------------------------
bool ThreadPoolMan::Initialize(int count)
{
if(ThreadPoolMan::singleton){
S3FS_PRN_WARN("Already singleton for Thread Manager is existed, then re-create it.");
ThreadPoolMan::Destroy();
}
ThreadPoolMan::singleton = new ThreadPoolMan(count);
return true;
}
void ThreadPoolMan::Destroy()
{
if(ThreadPoolMan::singleton){
delete ThreadPoolMan::singleton;
ThreadPoolMan::singleton = NULL;
}
}
bool ThreadPoolMan::Instruct(thpoolman_param* pparam)
{
if(!ThreadPoolMan::singleton){
S3FS_PRN_WARN("The singleton object is not initialized yet.");
return false;
}
return ThreadPoolMan::singleton->SetInstruction(pparam);
}
//
// Thread worker
//
void* ThreadPoolMan::Worker(void* arg)
{
ThreadPoolMan* psingleton = static_cast<ThreadPoolMan*>(arg);
if(!psingleton){
S3FS_PRN_ERR("The parameter for worker thread is invalid.");
return (void*)(intptr_t)(-EIO);
}
S3FS_PRN_INFO3("Start worker thread in ThreadPoolMan.");
while(!psingleton->is_exit){
// wait
psingleton->thpoolman_sem.wait();
if(psingleton->is_exit){
break;
}
// get instruction
thpoolman_param* pparam;
{
AutoLock auto_lock(&(psingleton->thread_list_lock));
if(!psingleton->instruction_list.empty()){
pparam = psingleton->instruction_list.front();
psingleton->instruction_list.pop_front();
if(!pparam){
S3FS_PRN_WARN("Got a semaphore, but the instruction is empty.");
}
}else{
S3FS_PRN_WARN("Got a semaphore, but there is no instruction.");
pparam = NULL;
}
}
if(pparam){
void* retval = pparam->pfunc(pparam->args);
int int_retval = (int)(intptr_t)(retval);
if(0 != int_retval){
S3FS_PRN_WARN("The instruction function returned with somthign error code(%d).", int_retval);
}
if(pparam->psem){
pparam->psem->post();
}
delete pparam;
}
}
return (void*)(intptr_t)(0);
}
//------------------------------------------------
// ThreadPoolMan methods
//------------------------------------------------
ThreadPoolMan::ThreadPoolMan(int count) : is_exit(false), thpoolman_sem(0)
{
if(count < 1){
S3FS_PRN_CRIT("Failed to creating singleton for Thread Manager, because thread count(%d) is under 1.", count);
abort();
}
if(ThreadPoolMan::singleton){
S3FS_PRN_CRIT("Already singleton for Thread Manager is existed.");
abort();
}
is_lock_init = false;
pthread_mutexattr_t attr;
pthread_mutexattr_init(&attr);
#if S3FS_PTHREAD_ERRORCHECK
pthread_mutexattr_settype(&attr, PTHREAD_MUTEX_ERRORCHECK);
#endif
int result;
if(0 != (result = pthread_mutex_init(&thread_list_lock, &attr))){
S3FS_PRN_CRIT("failed to init thread_list_lock: %d", result);
abort();
}
is_lock_init = true;
// create threads
if(!StartThreads(count)){
S3FS_PRN_ERR("Failed starting threads at initializing.");
abort();
}
}
ThreadPoolMan::~ThreadPoolMan()
{
StopThreads();
if(is_lock_init){
int result;
if(0 != (result = pthread_mutex_destroy(&thread_list_lock))){
S3FS_PRN_CRIT("failed to destroy thread_list_lock: %d", result);
abort();
}
is_lock_init = false;
}
}
bool ThreadPoolMan::StopThreads()
{
if(thread_list.empty()){
S3FS_PRN_INFO("Any threads are running now, then nothing to do.");
return true;
}
// all threads to exit
is_exit = true;
for(uint waitcnt = thread_list.size(); 0 < waitcnt; --waitcnt){
thpoolman_sem.post();
}
// wait for threads exiting
for(thread_list_t::const_iterator iter = thread_list.begin(); iter != thread_list.end(); ++iter){
void* retval = NULL;
int result = pthread_join(*iter, &retval);
if(result){
S3FS_PRN_ERR("failed pthread_join - result(%d)", result);
}else{
S3FS_PRN_DBG("succeed pthread_join - return code(%d)", (int)(intptr_t)(retval));
}
}
thread_list.clear();
// reset semaphore(to zero)
while(thpoolman_sem.try_wait());
// clear instructions
for(thpoolman_params_t::const_iterator iter = instruction_list.begin(); iter != instruction_list.end(); ++iter){
thpoolman_param* pparam = *iter;
delete pparam;
}
instruction_list.clear();
return true;
}
bool ThreadPoolMan::StartThreads(int count)
{
if(count < 1){
S3FS_PRN_ERR("Failed to creating threads, because thread count(%d) is under 1.", count);
return false;
}
// stop all thread if they are running.
if(!StopThreads()){
S3FS_PRN_ERR("Failed to stop existed threads.");
return false;
}
// create all threads
is_exit = false;
for(int cnt = 0; cnt < count; ++cnt){
// run thread
pthread_t thread;
int result;
if(0 != (result = pthread_create(&thread, NULL, ThreadPoolMan::Worker, static_cast<void*>(this)))){
S3FS_PRN_ERR("failed pthread_create with return code(%d)", result);
StopThreads(); // if possible, stop all threads
return false;
}
thread_list.push_back(thread);
}
return true;
}
bool ThreadPoolMan::SetInstruction(thpoolman_param* pparam)
{
if(!pparam){
S3FS_PRN_ERR("The parameter value is NULL.");
return false;
}
// set parameter to list
{
AutoLock auto_lock(&thread_list_lock);
instruction_list.push_back(pparam);
}
// run thread
thpoolman_sem.post();
return true;
}
/*
* Local variables:
* tab-width: 4
* c-basic-offset: 4
* End:
* vim600: expandtab sw=4 ts=4 fdm=marker
* vim<600: expandtab sw=4 ts=4
*/

97
src/threadpoolman.h Normal file
View File

@ -0,0 +1,97 @@
/*
* s3fs - FUSE-based file system backed by Amazon S3
*
* Copyright(C) 2007 Randy Rizun <rrizun@gmail.com>
*
* This program is free software; you can redistribute it and/or
* modify it under the terms of the GNU General Public License
* as published by the Free Software Foundation; either version 2
* of the License, or (at your option) any later version.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU General Public License for more details.
*
* You should have received a copy of the GNU General Public License
* along with this program; if not, write to the Free Software
* Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA.
*/
#ifndef S3FS_THREADPOOLMAN_H_
#define S3FS_THREADPOOLMAN_H_
#include "psemaphore.h"
//------------------------------------------------
// Typedefs for functions and structures
//------------------------------------------------
//
// Prototype function
//
typedef void* (*thpoolman_worker)(void*); // same as start_routine for pthread_create function
//
// Parameter structure
//
// [NOTE]
// The args member is a value that is an argument of the worker function.
// The psem member is allowed NULL. If it is not NULL, the post() method is
// called when finishing the function.
//
struct thpoolman_param
{
void* args;
Semaphore* psem;
thpoolman_worker pfunc;
thpoolman_param() : args(NULL), psem(NULL), pfunc(NULL) {}
};
typedef std::list<thpoolman_param*> thpoolman_params_t;
typedef std::list<pthread_t> thread_list_t;
//------------------------------------------------
// Class ThreadPoolMan
//------------------------------------------------
class ThreadPoolMan
{
private:
static ThreadPoolMan* singleton;
volatile bool is_exit;
Semaphore thpoolman_sem;
bool is_lock_init;
pthread_mutex_t thread_list_lock;
thread_list_t thread_list;
thpoolman_params_t instruction_list;
private:
static void* Worker(void* arg);
explicit ThreadPoolMan(int count = 1);
~ThreadPoolMan();
bool StopThreads();
bool StartThreads(int count);
bool SetInstruction(thpoolman_param* pparam);
public:
static bool Initialize(int count);
static void Destroy();
static bool Instruct(thpoolman_param* pparam);
};
#endif // S3FS_THREADPOOLMAN_H_
/*
* Local variables:
* tab-width: 4
* c-basic-offset: 4
* End:
* vim600: expandtab sw=4 ts=4 fdm=marker
* vim<600: expandtab sw=4 ts=4
*/

View File

@ -243,7 +243,7 @@ struct filepart
petag = petagobj;
}
int get_part_number()
int get_part_number() const
{
if(!petag){
return -1;
@ -282,9 +282,13 @@ struct untreatedpart
untreated_tag = 0;
}
// [NOTE]
// Check if the areas overlap
// However, even if the areas do not overlap, this method returns true if areas are adjacent.
//
bool check_overlap(off_t chk_start, off_t chk_size)
{
if(chk_start < 0 || chk_size <= 0 || (chk_start + chk_size) < start || (start + size) < chk_start){
if(chk_start < 0 || chk_size <= 0 || start < 0 || size <= 0 || (chk_start + chk_size) < start || (start + size) < chk_start){
return false;
}
return true;
@ -308,6 +312,32 @@ struct untreatedpart
typedef std::list<untreatedpart> untreated_list_t;
//
// Information on each part of multipart upload
//
struct mp_part
{
off_t start;
off_t size;
int part_num; // Set only for information to upload
mp_part(off_t set_start = 0, off_t set_size = 0, int part = 0) : start(set_start), size(set_size), part_num(part) {}
};
typedef std::list<struct mp_part> mp_part_list_t;
struct total_mp_part_list
{
off_t operator()(const mp_part_list_t& mplist) const
{
off_t size = 0;
for(mp_part_list_t::const_iterator iter = mplist.begin(); iter != mplist.end(); ++iter){
size += iter->size;
}
return size;
}
};
//-------------------------------------------------------------------
// mimes_t
//-------------------------------------------------------------------

View File

@ -52,6 +52,7 @@ if [ -n "${ALL_TESTS}" ]; then
sigv4
"singlepart_copy_limit=10" # limit size to exercise multipart code paths
#use_sse # TODO: S3Proxy does not support SSE
"use_cache=${CACHE_DIR} -o ensure_diskfree=${ENSURE_DISKFREE_SIZE} -o fake_diskfree=${FAKE_FREE_DISK_SIZE} -o streamupload"
)
else
FLAGS=(