Refactored for standardizing content and copy handling for Multipart Upload

This commit is contained in:
Takeshi Nakatani 2024-07-15 06:40:05 +00:00 committed by Andrew Gaul
parent 143284b2f3
commit 499577c2a9
8 changed files with 270 additions and 361 deletions

View File

@ -1227,7 +1227,6 @@ bool S3fsCurl::MultipartUploadPartCallback(S3fsCurl* s3fscurl, void* param)
if(!s3fscurl || param){ // this callback does not need a parameter
return false;
}
return s3fscurl->MultipartUploadPartComplete();
}
@ -1239,8 +1238,7 @@ bool S3fsCurl::MixMultipartUploadCallback(S3fsCurl* s3fscurl, void* param)
if(!s3fscurl || param){ // this callback does not need a parameter
return false;
}
return s3fscurl->MixMultipartUploadComplete();
return s3fscurl->MultipartUploadPartComplete();
}
std::unique_ptr<S3fsCurl> S3fsCurl::MultipartUploadPartRetryCallback(S3fsCurl* s3fscurl)
@ -1283,7 +1281,7 @@ std::unique_ptr<S3fsCurl> S3fsCurl::MultipartUploadPartRetryCallback(S3fsCurl* s
newcurl->type = s3fscurl->type;
// setup new curl object
if(0 != newcurl->MultipartUploadPartSetup(s3fscurl->path.c_str(), part_num, upload_id)){
if(0 != newcurl->MultipartUploadContentPartSetup(s3fscurl->path.c_str(), part_num, upload_id)){
S3FS_PRN_ERR("Could not duplicate curl object(%s:%d).", s3fscurl->path.c_str(), part_num);
return nullptr;
}
@ -1327,7 +1325,7 @@ std::unique_ptr<S3fsCurl> S3fsCurl::CopyMultipartUploadRetryCallback(S3fsCurl* s
newcurl->type = s3fscurl->type;
// setup new curl object
if(0 != newcurl->CopyMultipartUploadSetup(s3fscurl->b_from.c_str(), s3fscurl->path.c_str(), part_num, upload_id, s3fscurl->b_meta)){
if(0 != newcurl->MultipartUploadCopyPartSetup(s3fscurl->b_from.c_str(), s3fscurl->path.c_str(), part_num, upload_id, s3fscurl->b_meta)){
S3FS_PRN_ERR("Could not duplicate curl object(%s:%d).", s3fscurl->path.c_str(), part_num);
return nullptr;
}
@ -1374,69 +1372,6 @@ int S3fsCurl::MapPutErrorResponse(int result)
return result;
}
// [NOTE]
// It is a factory method as utility because it requires an S3fsCurl object
// initialized for multipart upload from outside this class.
//
std::unique_ptr<S3fsCurl> S3fsCurl::CreateParallelS3fsCurl(const char* tpath, int fd, off_t start, off_t size, int part_num, bool is_copy, etagpair* petag, const std::string& upload_id, int& result)
{
// duplicate fd
if(!tpath || -1 == fd || start < 0 || size <= 0 || !petag){
S3FS_PRN_ERR("Parameters are wrong: tpath(%s), fd(%d), start(%lld), size(%lld), petag(%s)", SAFESTRPTR(tpath), fd, static_cast<long long int>(start), static_cast<long long int>(size), (petag ? "not null" : "null"));
result = -EIO;
return nullptr;
}
result = 0;
std::unique_ptr<S3fsCurl> s3fscurl(new S3fsCurl(true));
if(!is_copy){
s3fscurl->partdata.fd = fd;
s3fscurl->partdata.startpos = start;
s3fscurl->partdata.size = size;
s3fscurl->partdata.is_copy = is_copy;
s3fscurl->partdata.petag = petag; // [NOTE] be careful, the value is set directly
s3fscurl->b_partdata_startpos = s3fscurl->partdata.startpos;
s3fscurl->b_partdata_size = s3fscurl->partdata.size;
S3FS_PRN_INFO3("Upload Part [tpath=%s][start=%lld][size=%lld][part=%d]", SAFESTRPTR(tpath), static_cast<long long>(start), static_cast<long long>(size), part_num);
if(0 != (result = s3fscurl->MultipartUploadPartSetup(tpath, part_num, upload_id))){
S3FS_PRN_ERR("failed uploading part setup(%d)", result);
return nullptr;
}
}else{
headers_t meta;
std::string srcresource;
std::string srcurl;
MakeUrlResource(get_realpath(tpath).c_str(), srcresource, srcurl);
meta["x-amz-copy-source"] = srcresource;
std::ostringstream strrange;
strrange << "bytes=" << start << "-" << (start + size - 1);
meta["x-amz-copy-source-range"] = strrange.str();
s3fscurl->b_from = SAFESTRPTR(tpath);
s3fscurl->b_meta = meta;
s3fscurl->partdata.petag = petag; // [NOTE] be careful, the value is set directly
S3FS_PRN_INFO3("Copy Part [tpath=%s][start=%lld][size=%lld][part=%d]", SAFESTRPTR(tpath), static_cast<long long>(start), static_cast<long long>(size), part_num);
if(0 != (result = s3fscurl->CopyMultipartUploadSetup(tpath, tpath, part_num, upload_id, meta))){
S3FS_PRN_ERR("failed uploading part setup(%d)", result);
return nullptr;
}
}
// Call lazy function
if(!s3fscurl->fpLazySetup || !s3fscurl->fpLazySetup(s3fscurl.get())){
S3FS_PRN_ERR("failed lazy function setup for uploading part");
result = -EIO;
return nullptr;
}
return s3fscurl;
}
int S3fsCurl::ParallelMultipartUploadRequest(const char* tpath, const headers_t& meta, int fd)
{
int result;
@ -1475,7 +1410,7 @@ int S3fsCurl::ParallelMultipartUploadRequest(const char* tpath, const headers_t&
s3fscurl_para->partdata.add_etag_list(list);
// initiate upload part for parallel
if(0 != (result = s3fscurl_para->MultipartUploadPartSetup(tpath, s3fscurl_para->partdata.get_part_number(), upload_id))){
if(0 != (result = s3fscurl_para->MultipartUploadContentPartSetup(tpath, s3fscurl_para->partdata.get_part_number(), upload_id))){
S3FS_PRN_ERR("failed uploading part setup(%d)", result);
return result;
}
@ -1548,7 +1483,7 @@ int S3fsCurl::ParallelMixMultipartUploadRequest(const char* tpath, headers_t& me
S3FS_PRN_INFO3("Upload Part [tpath=%s][start=%lld][size=%lld][part=%d]", SAFESTRPTR(tpath), static_cast<long long>(iter->offset), static_cast<long long>(iter->bytes), s3fscurl_para->partdata.get_part_number());
// initiate upload part for parallel
if(0 != (result = s3fscurl_para->MultipartUploadPartSetup(tpath, s3fscurl_para->partdata.get_part_number(), upload_id))){
if(0 != (result = s3fscurl_para->MultipartUploadContentPartSetup(tpath, s3fscurl_para->partdata.get_part_number(), upload_id))){
S3FS_PRN_ERR("failed uploading part setup(%d)", result);
return result;
}
@ -1586,7 +1521,7 @@ int S3fsCurl::ParallelMixMultipartUploadRequest(const char* tpath, headers_t& me
S3FS_PRN_INFO3("Copy Part [tpath=%s][start=%lld][size=%lld][part=%d]", SAFESTRPTR(tpath), static_cast<long long>(iter->offset + i), static_cast<long long>(bytes), s3fscurl_para->partdata.get_part_number());
// initiate upload part for parallel
if(0 != (result = s3fscurl_para->CopyMultipartUploadSetup(tpath, tpath, s3fscurl_para->partdata.get_part_number(), upload_id, meta))){
if(0 != (result = s3fscurl_para->MultipartUploadCopyPartSetup(tpath, tpath, s3fscurl_para->partdata.get_part_number(), upload_id, meta))){
S3FS_PRN_ERR("failed uploading part setup(%d)", result);
return result;
}
@ -3877,6 +3812,63 @@ int S3fsCurl::PreMultipartUploadRequest(const char* tpath, const headers_t& meta
return 0;
}
int S3fsCurl::MultipartUploadPartSetup(const char* tpath, int upload_fd, off_t start, off_t size, int part_num, const std::string& upload_id, etagpair* petag, bool is_copy)
{
// duplicate upload_fd
if(!tpath || start < 0 || size <= 0 || !petag || (!is_copy && -1 == upload_fd)){
S3FS_PRN_ERR("Parameters are wrong: path(%s), upload_fd(%d), start(%lld), size(%lld), petag(%s), is_copy(%s)", SAFESTRPTR(tpath), upload_fd, static_cast<long long int>(start), static_cast<long long int>(size), (petag ? "not null" : "null"), (is_copy ? "true" : "false"));
return -EIO;
}
int result = 0;
if(!is_copy){
partdata.fd = upload_fd;
partdata.startpos = start;
partdata.size = size;
partdata.is_copy = is_copy;
partdata.set_etag(petag); // [NOTE] be careful, the value is set directly
b_partdata_startpos = start;
b_partdata_size = size;
S3FS_PRN_INFO3("Upload Part [path=%s][start=%lld][size=%lld][part=%d]", SAFESTRPTR(tpath), static_cast<long long int>(start), static_cast<long long int>(size), part_num);
if(0 != (result = MultipartUploadContentPartSetup(tpath, part_num, upload_id))){
S3FS_PRN_ERR("failed uploading part setup(%d)", result);
return result;
}
}else{
headers_t meta;
std::string srcresource;
std::string srcurl;
MakeUrlResource(get_realpath(tpath).c_str(), srcresource, srcurl);
meta["x-amz-copy-source"] = srcresource;
std::ostringstream strrange;
strrange << "bytes=" << start << "-" << (start + size - 1);
meta["x-amz-copy-source-range"] = strrange.str();
b_from = SAFESTRPTR(tpath);
b_meta = meta;
partdata.set_etag(petag); // [NOTE] be careful, the value is set directly
S3FS_PRN_INFO3("Copy Part [path=%s][start=%lld][size=%lld][part=%d]", SAFESTRPTR(tpath), static_cast<long long int>(start), static_cast<long long int>(size), part_num);
if(0 != (result = MultipartUploadCopyPartSetup(tpath, tpath, part_num, upload_id, meta))){
S3FS_PRN_ERR("failed uploading copy part setup(%d)", result);
return result;
}
}
// Call lazy function
if(!fpLazySetup || !fpLazySetup(this)){
S3FS_PRN_ERR("failed lazy function setup for uploading part");
return -EIO;
}
return 0;
}
int S3fsCurl::MultipartUploadComplete(const char* tpath, const std::string& upload_id, const etaglist_t& parts)
{
S3FS_PRN_INFO3("[tpath=%s][parts=%zu]", SAFESTRPTR(tpath), parts.size());
@ -4085,7 +4077,7 @@ int S3fsCurl::AbortMultipartUpload(const char* tpath, const std::string& upload_
// Content-MD5: pUNXr/BjKK5G2UKvaRRrOA==
// Authorization: AWS VGhpcyBtZXNzYWdlIHNpZ25lZGGieSRlbHZpbmc=
//
int S3fsCurl::MultipartUploadPartSetup(const char* tpath, int part_num, const std::string& upload_id)
int S3fsCurl::MultipartUploadContentPartSetup(const char* tpath, int part_num, const std::string& upload_id)
{
S3FS_PRN_INFO3("[tpath=%s][start=%lld][size=%lld][part=%d]", SAFESTRPTR(tpath), static_cast<long long int>(partdata.startpos), static_cast<long long int>(partdata.size), part_num);
@ -4145,37 +4137,7 @@ int S3fsCurl::MultipartUploadPartSetup(const char* tpath, int part_num, const st
return 0;
}
int S3fsCurl::MultipartUploadPartRequest(const char* tpath, int part_num, const std::string& upload_id)
{
int result;
S3FS_PRN_INFO3("[tpath=%s][start=%lld][size=%lld][part=%d]", SAFESTRPTR(tpath), static_cast<long long int>(partdata.startpos), static_cast<long long int>(partdata.size), part_num);
// setup
if(0 != (result = S3fsCurl::MultipartUploadPartSetup(tpath, part_num, upload_id))){
return result;
}
if(!fpLazySetup || !fpLazySetup(this)){
S3FS_PRN_ERR("Failed to lazy setup in multipart upload part request.");
return -EIO;
}
// request
if(0 == (result = RequestPerform())){
if(!MultipartUploadPartComplete()){
result = -EIO;
}
}
// closing
bodydata.clear();
headdata.clear();
return result;
}
int S3fsCurl::CopyMultipartUploadSetup(const char* from, const char* to, int part_num, const std::string& upload_id, const headers_t& meta)
int S3fsCurl::MultipartUploadCopyPartSetup(const char* from, const char* to, int part_num, const std::string& upload_id, const headers_t& meta)
{
S3FS_PRN_INFO3("[from=%s][to=%s][part=%d]", SAFESTRPTR(from), SAFESTRPTR(to), part_num);
@ -4241,7 +4203,7 @@ int S3fsCurl::CopyMultipartUploadSetup(const char* from, const char* to, int par
return 0;
}
bool S3fsCurl::MultipartUploadPartComplete()
bool S3fsCurl::MultipartUploadContentPartComplete()
{
auto it = responseHeaders.find("ETag");
if (it == responseHeaders.cend()) {
@ -4267,26 +4229,27 @@ bool S3fsCurl::MultipartUploadPartComplete()
return true;
}
bool S3fsCurl::CopyMultipartUploadComplete()
bool S3fsCurl::MultipartUploadCopyPartComplete()
{
std::string etag;
partdata.uploaded = simple_parse_xml(bodydata.c_str(), bodydata.size(), "ETag", etag);
partdata.petag->etag = peeloff(std::move(etag));
bodydata.clear();
headdata.clear();
return true;
}
bool S3fsCurl::MixMultipartUploadComplete()
bool S3fsCurl::MultipartUploadPartComplete()
{
bool result;
if(-1 == partdata.fd){
result = CopyMultipartUploadComplete();
result = MultipartUploadCopyPartComplete();
}else{
result = MultipartUploadPartComplete();
result = MultipartUploadContentPartComplete();
}
bodydata.clear();
headdata.clear();
return result;
}
@ -4297,7 +4260,7 @@ int S3fsCurl::MultipartPutHeadRequest(const std::string& from, const std::string
int result;
// setup
if(0 != (result = CopyMultipartUploadSetup(from.c_str(), to.c_str(), part_number, upload_id, meta))){
if(0 != (result = MultipartUploadCopyPartSetup(from.c_str(), to.c_str(), part_number, upload_id, meta))){
S3FS_PRN_ERR("failed multipart put head request setup(from=%s, to=%s, part_number=%d, upload_id=%s) : %d", from.c_str(), to.c_str(), part_number, upload_id.c_str(), result);
return result;
}
@ -4314,27 +4277,34 @@ int S3fsCurl::MultipartPutHeadRequest(const std::string& from, const std::string
return 0;
}
int S3fsCurl::MultipartUploadRequest(const std::string& upload_id, const char* tpath, int fd, off_t offset, off_t size, etagpair* petagpair)
int S3fsCurl::MultipartUploadPartRequest(const char* tpath, int upload_fd, off_t start, off_t size, int part_num, const std::string& upload_id, etagpair* petag, bool is_copy)
{
S3FS_PRN_INFO3("[upload_id=%s][tpath=%s][fd=%d][offset=%lld][size=%lld]", upload_id.c_str(), SAFESTRPTR(tpath), fd, static_cast<long long int>(offset), static_cast<long long int>(size));
S3FS_PRN_INFO3("Multipart Upload Part [tpath=%s][upload_fd=%d][start=%lld][size=%lld][part_num=%d][upload_id=%s][is_copy=%s]", SAFESTRPTR(tpath), upload_fd, static_cast<long long int>(start), static_cast<long long int>(size), part_num, upload_id.c_str(), (is_copy ? "true" : "false"));
// set
partdata.fd = fd;
partdata.startpos = offset;
partdata.size = size;
b_partdata_startpos = partdata.startpos;
b_partdata_size = partdata.size;
partdata.set_etag(petagpair);
// upload part
//
// Setup
//
int result;
if(0 != (result = MultipartUploadPartRequest(tpath, petagpair->part_num, upload_id))){
S3FS_PRN_ERR("failed uploading %d part by error(%d)", petagpair->part_num, result);
if(0 != (result = MultipartUploadPartSetup(tpath, upload_fd, start, size, part_num, upload_id, petag, is_copy))){
S3FS_PRN_ERR("Failed pre-setup for Multipart Upload Part [tpath=%s][upload_fd=%d][start=%lld][size=%lld][part_num=%d][upload_id=%s][is_copy=%s]", SAFESTRPTR(tpath), upload_fd, static_cast<long long int>(start), static_cast<long long int>(size), part_num, upload_id.c_str(), (is_copy ? "true" : "false"));
return result;
}
DestroyCurlHandle();
return 0;
//
// Send request
//
if(0 == (result = RequestPerform())){
S3FS_PRN_DBG("Succeed Multipart Upload Part [tpath=%s][upload_fd=%d][start=%lld][size=%lld][part_num=%d][upload_id=%s][is_copy=%s]", SAFESTRPTR(tpath), upload_fd, static_cast<long long int>(start), static_cast<long long int>(size), part_num, upload_id.c_str(), (is_copy ? "true" : "false"));
if(!MultipartUploadPartComplete()){
S3FS_PRN_ERR("Failed completion for Multipart Upload Part [tpath=%s][upload_fd=%d][start=%lld][size=%lld][part_num=%d][upload_id=%s][is_copy=%s]", SAFESTRPTR(tpath), upload_fd, static_cast<long long int>(start), static_cast<long long int>(size), part_num, upload_id.c_str(), (is_copy ? "true" : "false"));
result = -EIO;
}
}else{
S3FS_PRN_ERR("Failed Multipart Upload Part with error(%d) [tpath=%s][upload_fd=%d][start=%lld][size=%lld][part_num=%d][upload_id=%s][is_copy=%s]", result, SAFESTRPTR(tpath), upload_fd, static_cast<long long int>(start), static_cast<long long int>(size), part_num, upload_id.c_str(), (is_copy ? "true" : "false"));
}
return result;
}
/*

View File

@ -277,10 +277,10 @@ class S3fsCurl
}
std::string CalcSignatureV2(const std::string& method, const std::string& strMD5, const std::string& content_type, const std::string& date, const std::string& resource, const std::string& secret_access_key, const std::string& access_token);
std::string CalcSignature(const std::string& method, const std::string& canonical_uri, const std::string& query_string, const std::string& strdate, const std::string& payload_hash, const std::string& date8601, const std::string& secret_access_key, const std::string& access_token);
int MultipartUploadPartSetup(const char* tpath, int part_num, const std::string& upload_id);
int CopyMultipartUploadSetup(const char* from, const char* to, int part_num, const std::string& upload_id, const headers_t& meta);
bool MultipartUploadPartComplete();
bool CopyMultipartUploadComplete();
int MultipartUploadContentPartSetup(const char* tpath, int part_num, const std::string& upload_id);
int MultipartUploadCopyPartSetup(const char* from, const char* to, int part_num, const std::string& upload_id, const headers_t& meta);
bool MultipartUploadContentPartComplete();
bool MultipartUploadCopyPartComplete();
int MapPutErrorResponse(int result);
public:
@ -289,7 +289,6 @@ class S3fsCurl
static bool InitCredentialObject(S3fsCred* pcredobj);
static bool InitMimeType(const std::string& strFile);
static bool DestroyS3fsCurl();
static std::unique_ptr<S3fsCurl> CreateParallelS3fsCurl(const char* tpath, int fd, off_t start, off_t size, int part_num, bool is_copy, etagpair* petag, const std::string& upload_id, int& result);
static int ParallelMultipartUploadRequest(const char* tpath, const headers_t& meta, int fd);
static int ParallelMixMultipartUploadRequest(const char* tpath, headers_t& meta, int fd, const fdpage_list_t& mixuppages);
@ -377,13 +376,13 @@ class S3fsCurl
int CheckBucket(const char* check_path, bool compat_dir, bool force_no_sse);
int ListBucketRequest(const char* tpath, const char* query);
int PreMultipartUploadRequest(const char* tpath, const headers_t& meta, std::string& upload_id);
int MultipartUploadPartSetup(const char* tpath, int upload_fd, off_t start, off_t size, int part_num, const std::string& upload_id, etagpair* petag, bool is_copy);
int MultipartUploadComplete(const char* tpath, const std::string& upload_id, const etaglist_t& parts);
int MultipartUploadPartRequest(const char* tpath, int part_num, const std::string& upload_id);
bool MixMultipartUploadComplete();
bool MultipartUploadPartComplete();
int MultipartListRequest(std::string& body);
int AbortMultipartUpload(const char* tpath, const std::string& upload_id);
int MultipartPutHeadRequest(const std::string& from, const std::string& to, int part_number, const std::string& upload_id, const headers_t& meta);
int MultipartUploadRequest(const std::string& upload_id, const char* tpath, int fd, off_t offset, off_t size, etagpair* petagpair);
int MultipartUploadPartRequest(const char* tpath, int upload_fd, off_t start, off_t size, int part_num, const std::string& upload_id, etagpair* petag, bool is_copy);
// methods(variables)
const std::string& GetPath() const { return path; }

View File

@ -51,25 +51,6 @@
//------------------------------------------------
static constexpr int MAX_MULTIPART_CNT = 10 * 1000; // S3 multipart max count
//------------------------------------------------
// Structure of parameters to pass to thread
//------------------------------------------------
//
// Multipart Upload Request parameter structure for Thread Pool.
//
// ([TODO] This is a temporary structure is moved when S3fsMultiCurl is deprecated.)
//
struct multipart_upload_req_thparam
{
std::string path;
std::string upload_id;
int fd = -1;
off_t start = 0;
off_t size = 0;
etagpair* petagpair = nullptr;
int result = 0;
};
//------------------------------------------------
// FdEntity class variables
//------------------------------------------------
@ -129,25 +110,6 @@ ino_t FdEntity::GetInode(int fd)
return st.st_ino;
}
//
// Worker function for multipart upload request
//
// ([TODO] This is a temporary structure is moved when S3fsMultiCurl is deprecated.)
//
void* FdEntity::MultipartUploadThreadWorker(void* arg)
{
auto* pthparam = static_cast<multipart_upload_req_thparam*>(arg);
if(!pthparam){
return reinterpret_cast<void*>(-EIO);
}
S3FS_PRN_INFO3("Multipart Upload Request [path=%s][upload id=%s][fd=%d][start=%lld][size=%lld][etagpair=%p]", pthparam->path.c_str(), pthparam->upload_id.c_str(), pthparam->fd, static_cast<long long>(pthparam->start), static_cast<long long>(pthparam->size), pthparam->petagpair);
S3fsCurl s3fscurl(true);
pthparam->result = s3fscurl.MultipartUploadRequest(pthparam->upload_id, pthparam->path.c_str(), pthparam->fd, pthparam->start, pthparam->size, pthparam->petagpair);
return reinterpret_cast<void*>(pthparam->result);
}
//------------------------------------------------
// FdEntity methods
//------------------------------------------------
@ -1262,9 +1224,6 @@ int FdEntity::NoCacheLoadAndPost(PseudoFdInfo* pseudo_obj, off_t start, off_t si
return result;
}
//
// Common method that calls S3fsCurl::PreMultipartUploadRequest via pre_multipart_upload_request
//
// [NOTE]
// If the request is successful, initialize upload_id.
//
@ -1275,19 +1234,11 @@ int FdEntity::PreMultipartUploadRequest(PseudoFdInfo* pseudo_obj)
return -EIO;
}
// get upload_id
std::string upload_id;
int result;
if(0 != (result = pre_multipart_upload_request(path, orgmeta, upload_id))){
if(0 != (result = pseudo_obj->PreMultipartUploadRequest(path, orgmeta))){
return result;
}
// reset upload_id
if(!pseudo_obj->InitialUploadInfo(upload_id)){
S3FS_PRN_ERR("failed to initialize upload id(%s)", upload_id.c_str());
return -EIO;
}
// Clear the dirty flag, because the meta data is updated.
pending_status = pending_status_t::NO_UPDATE_PENDING;
@ -1320,8 +1271,6 @@ int FdEntity::NoCachePreMultipartUploadRequest(PseudoFdInfo* pseudo_obj)
// At no disk space for caching object.
// This method is uploading one part of multipart.
//
// ([TODO] This is a temporary modification till S3fsMultiCurl is deprecated.)
//
int FdEntity::NoCacheMultipartUploadRequest(PseudoFdInfo* pseudo_obj, int tgfd, off_t start, off_t size)
{
if(-1 == tgfd || !pseudo_obj || !pseudo_obj->IsUploading()){
@ -1329,40 +1278,23 @@ int FdEntity::NoCacheMultipartUploadRequest(PseudoFdInfo* pseudo_obj, int tgfd,
return -EIO;
}
// parameter for thread worker
multipart_upload_req_thparam thargs;
thargs.path = path;
thargs.upload_id.clear();
thargs.fd = tgfd;
thargs.start = start;
thargs.size = size;
thargs.petagpair = nullptr;
thargs.result = 0;
// get upload id
if(!pseudo_obj->GetUploadId(thargs.upload_id)){
std::string upload_id;
if(!pseudo_obj->GetUploadId(upload_id)){
return -EIO;
}
// append new part and get it's etag string pointer
if(!pseudo_obj->AppendUploadPart(start, size, false, &(thargs.petagpair))){
etagpair* petag = nullptr;
if(!pseudo_obj->AppendUploadPart(start, size, false, &petag)){
return -EIO;
}
// make parameter for thread pool
thpoolman_param ppoolparam;
ppoolparam.args = &thargs;
ppoolparam.psem = nullptr; // case await
ppoolparam.pfunc = FdEntity::MultipartUploadThreadWorker;
// send request by thread
if(!ThreadPoolMan::AwaitInstruct(ppoolparam)){
S3FS_PRN_ERR("failed to setup Get Object Request Thread Worker");
return -EIO;
}
if(0 != thargs.result){
S3FS_PRN_ERR("Multipart Upload Request(path=%s, upload_id=%s, fd=%d, start=%lld, size=%lld) returns with error(%d)", path.c_str(), thargs.upload_id.c_str(), tgfd, static_cast<long long int>(start), static_cast<long long int>(size), thargs.result);
return thargs.result;
// request to thread
int result;
if(0 != (result = await_multipart_upload_part_request(path, tgfd, start, size, petag->part_num, upload_id, petag, false))){
S3FS_PRN_ERR("Failed No Cache Multipart Upload Part Request by error(%d) [path=%s][upload_id=%s][fd=%d][start=%lld][size=%lld]", result, path.c_str(), upload_id.c_str(), tgfd, static_cast<long long int>(start), static_cast<long long int>(size));
return result;
}
return 0;
}

View File

@ -80,7 +80,6 @@ class FdEntity : public std::enable_shared_from_this<FdEntity>
private:
static int FillFile(int fd, unsigned char byte, off_t size, off_t start);
static ino_t GetInode(int fd);
static void* MultipartUploadThreadWorker(void* arg); // ([TODO] This is a temporary method is moved when S3fsMultiCurl is deprecated.)
void Clear();
ino_t GetInode() const REQUIRES(FdEntity::fdent_data_lock);

View File

@ -40,92 +40,10 @@
#include "threadpoolman.h"
#include "s3fs_threadreqs.h"
//------------------------------------------------
// Structure of parameters to pass to thread
//------------------------------------------------
// [NOTE]
// The processing related to this is currently temporarily implemented
// in this file, but will be moved to a separate file at a later.
//
struct pseudofdinfo_mpupload_thparam
{
PseudoFdInfo* ppseudofdinfo = nullptr;
std::string path;
std::string upload_id;
int upload_fd = -1;
off_t start = 0;
off_t size = 0;
bool is_copy = false;
int part_num = -1;
etagpair* petag = nullptr;
};
//------------------------------------------------
// PseudoFdInfo class methods
//------------------------------------------------
//
// Thread Worker function for uploading
//
void* PseudoFdInfo::MultipartUploadThreadWorker(void* arg)
{
std::unique_ptr<pseudofdinfo_mpupload_thparam> pthparam(static_cast<pseudofdinfo_mpupload_thparam*>(arg));
if(!pthparam || !(pthparam->ppseudofdinfo)){
return reinterpret_cast<void*>(-EIO);
}
S3FS_PRN_INFO3("Upload Part Thread [tpath=%s][start=%lld][size=%lld][part=%d]", pthparam->path.c_str(), static_cast<long long>(pthparam->start), static_cast<long long>(pthparam->size), pthparam->part_num);
int result;
{
const std::lock_guard<std::mutex> lock(pthparam->ppseudofdinfo->upload_list_lock);
if(0 != (result = pthparam->ppseudofdinfo->last_result)){
S3FS_PRN_DBG("Already occurred error, thus this thread worker is exiting.");
if(!pthparam->ppseudofdinfo->CompleteInstruction(result)){ // result will be overwritten with the same value.
result = -EIO;
}
return reinterpret_cast<void*>(result);
}
}
// setup and make curl object
std::unique_ptr<S3fsCurl> s3fscurl(S3fsCurl::CreateParallelS3fsCurl(pthparam->path.c_str(), pthparam->upload_fd, pthparam->start, pthparam->size, pthparam->part_num, pthparam->is_copy, pthparam->petag, pthparam->upload_id, result));
if(nullptr == s3fscurl){
S3FS_PRN_ERR("failed creating s3fs curl object for uploading [path=%s][start=%lld][size=%lld][part=%d]", pthparam->path.c_str(), static_cast<long long>(pthparam->start), static_cast<long long>(pthparam->size), pthparam->part_num);
// set result for exiting
const std::lock_guard<std::mutex> lock(pthparam->ppseudofdinfo->upload_list_lock);
if(!pthparam->ppseudofdinfo->CompleteInstruction(result)){
result = -EIO;
}
return reinterpret_cast<void*>(result);
}
// Send request and get result
if(0 == (result = s3fscurl->RequestPerform())){
S3FS_PRN_DBG("succeed uploading [path=%s][start=%lld][size=%lld][part=%d]", pthparam->path.c_str(), static_cast<long long>(pthparam->start), static_cast<long long>(pthparam->size), pthparam->part_num);
if(!s3fscurl->MixMultipartUploadComplete()){
S3FS_PRN_ERR("failed completion uploading [path=%s][start=%lld][size=%lld][part=%d]", pthparam->path.c_str(), static_cast<long long>(pthparam->start), static_cast<long long>(pthparam->size), pthparam->part_num);
result = -EIO;
}
}else{
S3FS_PRN_ERR("failed uploading with error(%d) [path=%s][start=%lld][size=%lld][part=%d]", result, pthparam->path.c_str(), static_cast<long long>(pthparam->start), static_cast<long long>(pthparam->size), pthparam->part_num);
}
s3fscurl->DestroyCurlHandle(false);
// set result
const std::lock_guard<std::mutex> lock(pthparam->ppseudofdinfo->upload_list_lock);
if(!pthparam->ppseudofdinfo->CompleteInstruction(result)){
S3FS_PRN_WARN("This thread worker is about to end, so it doesn't return an EIO here and runs to the end.");
}
return reinterpret_cast<void*>(result);
}
//------------------------------------------------
// PseudoFdInfo methods
//------------------------------------------------
PseudoFdInfo::PseudoFdInfo(int fd, int open_flags) : pseudo_fd(-1), physical_fd(fd), flags(0), upload_fd(-1), instruct_count(0), completed_count(0), last_result(0), uploaded_sem(0)
PseudoFdInfo::PseudoFdInfo(int fd, int open_flags) : pseudo_fd(-1), physical_fd(fd), flags(0), upload_fd(-1), instruct_count(0), last_result(0), uploaded_sem(0)
{
if(-1 != physical_fd){
pseudo_fd = PseudoFdManager::Get();
@ -272,7 +190,6 @@ bool PseudoFdInfo::ResetUploadInfo()
upload_id.clear();
upload_list.clear();
instruct_count = 0;
completed_count = 0;
last_result = 0;
return true;
@ -306,22 +223,6 @@ void PseudoFdInfo::IncreaseInstructionCount()
++instruct_count;
}
bool PseudoFdInfo::CompleteInstruction(int result)
{
if(0 != result){
last_result = result;
}
if(0 >= instruct_count){
S3FS_PRN_ERR("Internal error: instruct_count caused an underflow.");
return false;
}
--instruct_count;
++completed_count;
return true;
}
bool PseudoFdInfo::GetUploadInfo(std::string& id, int& fd) const
{
const std::lock_guard<std::mutex> lock(upload_list_lock);
@ -438,10 +339,6 @@ bool PseudoFdInfo::InsertUploadPart(off_t start, off_t size, int part_num, bool
return true;
}
// [NOTE]
// This method only launches the upload thread.
// Check the maximum number of threads before calling.
//
bool PseudoFdInfo::ParallelMultipartUpload(const char* path, const mp_part_list_t& mplist, bool is_copy)
{
//S3FS_PRN_DBG("[path=%s][mplist(%zu)]", SAFESTRPTR(path), mplist.size());
@ -461,39 +358,24 @@ bool PseudoFdInfo::ParallelMultipartUpload(const char* path, const mp_part_list_
return false;
}
std::string strpath = SAFESTRPTR(path);
for(auto iter = mplist.cbegin(); iter != mplist.cend(); ++iter){
// Insert upload part
etagpair* petag = nullptr;
if(!InsertUploadPart(iter->start, iter->size, iter->part_num, is_copy, &petag)){
S3FS_PRN_ERR("Failed to insert insert upload part(path=%s, start=%lld, size=%lld, part=%d, copy=%s) to mplist", SAFESTRPTR(path), static_cast<long long int>(iter->start), static_cast<long long int>(iter->size), iter->part_num, (is_copy ? "true" : "false"));
S3FS_PRN_ERR("Failed to insert Multipart Upload Part to mplist [path=%s][start=%lld][size=%lld][part_num=%d][is_copy=%s]", strpath.c_str(), static_cast<long long int>(iter->start), static_cast<long long int>(iter->size), iter->part_num, (is_copy ? "true" : "false"));
return false;
}
// make parameter for my thread
auto* thargs = new pseudofdinfo_mpupload_thparam;
thargs->ppseudofdinfo = this;
thargs->path = SAFESTRPTR(path);
thargs->upload_id = tmp_upload_id;
thargs->upload_fd = tmp_upload_fd;
thargs->start = iter->start;
thargs->size = iter->size;
thargs->is_copy = is_copy;
thargs->part_num = iter->part_num;
thargs->petag = petag;
// make parameter for thread pool
thpoolman_param ppoolparam;
ppoolparam.args = thargs;
ppoolparam.psem = &uploaded_sem;
ppoolparam.pfunc = PseudoFdInfo::MultipartUploadThreadWorker;
// setup instruction
if(!ThreadPoolMan::Instruct(ppoolparam)){
S3FS_PRN_ERR("failed setup instruction for uploading.");
delete thargs;
// setup instruction and request on another thread
int result;
if(0 != (result = multipart_upload_part_request(strpath, tmp_upload_fd, iter->start, iter->size, iter->part_num, tmp_upload_id, petag, is_copy, &uploaded_sem, &upload_list_lock, &last_result))){
S3FS_PRN_ERR("failed setup instruction for Multipart Upload Part Request by erro(%d) [path=%s][start=%lld][size=%lld][part_num=%d][is_copy=%s]", result, strpath.c_str(), static_cast<long long int>(iter->start), static_cast<long long int>(iter->size), iter->part_num, (is_copy ? "true" : "false"));
return false;
}
// Count up the number of internally managed threads
IncreaseInstructionCount();
}
return true;
@ -525,22 +407,23 @@ bool PseudoFdInfo::ParallelMultipartUploadAll(const char* path, const mp_part_li
// [NOTE]
// If the request is successful, initialize upload_id.
//
bool PseudoFdInfo::PreMultipartUploadRequest(const std::string& strpath, const headers_t& meta)
int PseudoFdInfo::PreMultipartUploadRequest(const std::string& strpath, const headers_t& meta)
{
// get upload_id
std::string new_upload_id;
if(0 != pre_multipart_upload_request(strpath, meta, new_upload_id)){
return false;
int result;
if(0 != (result = pre_multipart_upload_request(strpath, meta, new_upload_id))){
return result;
}
// reset upload_id
if(!RowInitialUploadInfo(new_upload_id, false/* not need to cancel */)){
S3FS_PRN_ERR("failed to setup multipart upload(set upload id to object)");
return false;
return -EIO;
}
S3FS_PRN_DBG("succeed to setup multipart upload(set upload id to object)");
return true;
return 0;
}
//
@ -626,8 +509,9 @@ ssize_t PseudoFdInfo::UploadBoundaryLastUntreatedArea(const char* path, headers_
//
if(!IsUploading()){
std::string strpath = SAFESTRPTR(path);
if(!PreMultipartUploadRequest(strpath, meta)){
return -EIO;
int result;
if(0 != (result = PreMultipartUploadRequest(strpath, meta))){
return result;
}
}
@ -669,7 +553,7 @@ int PseudoFdInfo::WaitAllThreadsExit()
bool is_loop = true;
{
const std::lock_guard<std::mutex> lock(upload_list_lock);
if(0 == instruct_count && 0 == completed_count){
if(0 == instruct_count){
result = last_result;
is_loop = false;
}
@ -680,10 +564,7 @@ int PseudoFdInfo::WaitAllThreadsExit()
uploaded_sem.acquire();
{
const std::lock_guard<std::mutex> lock(upload_list_lock);
if(0 < completed_count){
--completed_count;
}
if(0 == instruct_count && 0 == completed_count){
if(0 == --instruct_count){
// break loop
result = last_result;
is_loop = false;
@ -699,7 +580,7 @@ bool PseudoFdInfo::CancelAllThreads()
bool need_cancel = false;
{
const std::lock_guard<std::mutex> lock(upload_list_lock);
if(0 < instruct_count && 0 < completed_count){
if(0 < instruct_count){
S3FS_PRN_INFO("The upload thread is running, so cancel them and wait for the end.");
need_cancel = true;
last_result = -ECANCELED; // to stop thread running

View File

@ -48,24 +48,19 @@ class PseudoFdInfo
filepart_list_t upload_list GUARDED_BY(upload_list_lock);
petagpool etag_entities GUARDED_BY(upload_list_lock); // list of etag string and part number entities(to maintain the etag entity even if MPPART_INFO is destroyed)
int instruct_count GUARDED_BY(upload_list_lock); // number of instructions for processing by threads
int completed_count GUARDED_BY(upload_list_lock); // number of completed processes by thread
int last_result GUARDED_BY(upload_list_lock); // the result of thread processing
Semaphore uploaded_sem; // use a semaphore to trigger an upload completion like event flag
private:
static void* MultipartUploadThreadWorker(void* arg);
bool Clear();
void CloseUploadFd();
bool OpenUploadFd();
bool ResetUploadInfo() REQUIRES(upload_list_lock);
bool RowInitialUploadInfo(const std::string& id, bool is_cancel_mp);
void IncreaseInstructionCount();
bool CompleteInstruction(int result) REQUIRES(upload_list_lock);
bool GetUploadInfo(std::string& id, int& fd) const;
bool ParallelMultipartUpload(const char* path, const mp_part_list_t& mplist, bool is_copy);
bool InsertUploadPart(off_t start, off_t size, int part_num, bool is_copy, etagpair** ppetag);
bool PreMultipartUploadRequest(const std::string& strpath, const headers_t& meta);
bool CancelAllThreads();
bool ExtractUploadPartsFromUntreatedArea(off_t untreated_start, off_t untreated_size, mp_part_list_t& to_upload_list, filepart_list_t& cancel_upload_list, off_t max_mp_size);
bool IsUploadingHasLock() const REQUIRES(upload_list_lock);
@ -95,6 +90,7 @@ class PseudoFdInfo
bool AppendUploadPart(off_t start, off_t size, bool is_copy = false, etagpair** ppetag = nullptr);
bool ParallelMultipartUploadAll(const char* path, const mp_part_list_t& to_upload_list, const mp_part_list_t& copy_list, int& result);
int PreMultipartUploadRequest(const std::string& strpath, const headers_t& meta);
int WaitAllThreadsExit();
ssize_t UploadBoundaryLastUntreatedArea(const char* path, headers_t& meta, FdEntity* pfdent) REQUIRES(pfdent->GetMutex());

View File

@ -299,6 +299,46 @@ void* pre_multipart_upload_req_threadworker(void* arg)
return reinterpret_cast<void*>(pthparam->result);
}
//
// Worker function for pre multipart upload part request
//
void* multipart_upload_part_req_threadworker(void* arg)
{
auto* pthparam = static_cast<multipart_upload_part_req_thparam*>(arg);
if(!pthparam || !pthparam->pthparam_lock || !pthparam->petag || !pthparam->presult){
return reinterpret_cast<void*>(-EIO);
}
S3FS_PRN_INFO3("Multipart Upload Part Worker [path=%s][upload_id=%s][upload_fd=%d][start=%lld][size=%lld][is_copy=%s][part_num=%d]", pthparam->path.c_str(), pthparam->upload_id.c_str(), pthparam->upload_fd, static_cast<long long int>(pthparam->start), static_cast<long long int>(pthparam->size), (pthparam->is_copy ? "true" : "false"), pthparam->part_num);
//
// Check last thread result
//
{
const std::lock_guard<std::mutex> lock(*(pthparam->pthparam_lock));
if(0 != *(pthparam->presult)){
S3FS_PRN_DBG("Already occurred error(%d), thus this thread worker is exiting.", *(pthparam->presult));
return reinterpret_cast<void*>(*(pthparam->presult));
}
}
//
// Request
//
S3fsCurl s3fscurl(true);
int result;
if(0 != (result = s3fscurl.MultipartUploadPartRequest(pthparam->path.c_str(), pthparam->upload_fd, pthparam->start, pthparam->size, pthparam->part_num, pthparam->upload_id, pthparam->petag, pthparam->is_copy))){
S3FS_PRN_ERR("Failed Multipart Upload Part Worker with error(%d) [path=%s][upload_id=%s][upload_fd=%d][start=%lld][size=%lld][is_copy=%s][part_num=%d]", result, pthparam->path.c_str(), pthparam->upload_id.c_str(), pthparam->upload_fd, static_cast<long long int>(pthparam->start), static_cast<long long int>(pthparam->size), (pthparam->is_copy ? "true" : "false"), pthparam->part_num);
}
// Set result for exiting
{
const std::lock_guard<std::mutex> lock(*(pthparam->pthparam_lock));
*(pthparam->presult) = result;
}
return reinterpret_cast<void*>(result);
}
//
// Worker function for complete multipart upload request
//
@ -833,6 +873,78 @@ int pre_multipart_upload_request(const std::string& path, const headers_t& meta,
return 0;
}
//
// Calls S3fsCurl::MultipartUploadPartRequest via multipart_upload_part_req_threadworker
//
int multipart_upload_part_request(const std::string& path, int upload_fd, off_t start, off_t size, int part_num, const std::string& upload_id, etagpair* petag, bool is_copy, Semaphore* psem, std::mutex* pthparam_lock, int* req_result)
{
// parameter for thread worker
auto* thargs = new multipart_upload_part_req_thparam; // free in multipart_upload_part_req_threadworker
thargs->path = path;
thargs->upload_id = upload_id;
thargs->upload_fd = upload_fd;
thargs->start = start;
thargs->size = size;
thargs->is_copy = is_copy;
thargs->part_num = part_num;
thargs->pthparam_lock = pthparam_lock;
thargs->petag = petag;
thargs->presult = req_result;
// make parameter for thread pool
thpoolman_param ppoolparam;
ppoolparam.args = thargs;
ppoolparam.psem = psem;
ppoolparam.pfunc = multipart_upload_part_req_threadworker;
// send request by thread
if(!ThreadPoolMan::Instruct(ppoolparam)){
S3FS_PRN_ERR("failed to setup Multipart Upload Part Thread Worker [path=%s][upload_id=%s][upload_fd=%d][start=%lld][size=%lld][is_copy=%s][part_num=%d]", path.c_str(), upload_id.c_str(), upload_fd, static_cast<long long int>(start), static_cast<long long int>(size), (is_copy ? "true" : "false"), part_num);;
return -EIO;
}
return 0;
}
//
// Calls and Await S3fsCurl::MultipartUploadPartRequest via multipart_upload_part_req_threadworker
//
int await_multipart_upload_part_request(const std::string& path, int upload_fd, off_t start, off_t size, int part_num, const std::string& upload_id, etagpair* petag, bool is_copy)
{
std::mutex thparam_lock;
int req_result = 0;
// parameter for thread worker
auto* thargs = new multipart_upload_part_req_thparam; // free in multipart_upload_part_req_threadworker
thargs->path = path;
thargs->upload_id = upload_id;
thargs->upload_fd = upload_fd;
thargs->start = start;
thargs->size = size;
thargs->is_copy = is_copy;
thargs->part_num = part_num;
thargs->pthparam_lock = &thparam_lock;
thargs->petag = petag;
thargs->presult = &req_result;
// make parameter for thread pool
thpoolman_param ppoolparam;
ppoolparam.args = thargs;
ppoolparam.psem = nullptr; // case await
ppoolparam.pfunc = multipart_upload_part_req_threadworker;
// send request by thread
if(!ThreadPoolMan::AwaitInstruct(ppoolparam)){
S3FS_PRN_ERR("failed to setup Await Multipart Upload Part Thread Worker [path=%s][upload_id=%s][upload_fd=%d][start=%lld][size=%lld][is_copy=%s][part_num=%d]", path.c_str(), upload_id.c_str(), upload_fd, static_cast<long long int>(start), static_cast<long long int>(size), (is_copy ? "true" : "false"), part_num);;
return -EIO;
}
if(0 != req_result){
S3FS_PRN_ERR("Await Multipart Upload Part Request by error(%d) [path=%s][upload_id=%s][upload_fd=%d][start=%lld][size=%lld][is_copy=%s][part_num=%d]", req_result, path.c_str(), upload_id.c_str(), upload_fd, static_cast<long long int>(start), static_cast<long long int>(size), (is_copy ? "true" : "false"), part_num);
return req_result;
}
return 0;
}
//
// Calls S3fsCurl::MultipartUploadComplete via complete_multipart_upload_threadworker
//

View File

@ -124,6 +124,23 @@ struct pre_multipart_upload_req_thparam
int result = 0;
};
//
// Multipart Upload Part Request parameter structure for Thread Pool.
//
struct multipart_upload_part_req_thparam
{
std::string path;
std::string upload_id;
int upload_fd = -1;
off_t start = 0;
off_t size = 0;
bool is_copy = false;
int part_num = -1;
std::mutex* pthparam_lock = nullptr;
etagpair* petag = nullptr;
int* presult = nullptr;
};
//
// Complete Multipart Upload Request parameter structure for Thread Pool.
//
@ -200,6 +217,7 @@ void* put_req_threadworker(void* arg);
void* list_bucket_req_threadworker(void* arg);
void* check_service_req_threadworker(void* arg);
void* pre_multipart_upload_req_threadworker(void* arg);
void* multipart_upload_part_req_threadworker(void* arg);
void* complete_multipart_upload_threadworker(void* arg);
void* abort_multipart_upload_req_threadworker(void* arg);
void* multipart_put_head_req_threadworker(void* arg);
@ -217,6 +235,8 @@ int put_request(const std::string& strpath, const headers_t& meta, int fd, bool
int list_bucket_request(const std::string& strpath, const std::string& query, std::string& responseBody);
int check_service_request(const std::string& strpath, bool forceNoSSE, bool support_compat_dir, long& responseCode, std::string& responseBody);
int pre_multipart_upload_request(const std::string& path, const headers_t& meta, std::string& upload_id);
int multipart_upload_part_request(const std::string& path, int upload_fd, off_t start, off_t size, int part_num, const std::string& upload_id, etagpair* petag, bool is_copy, Semaphore* psem, std::mutex* pthparam_lock, int* req_result);
int await_multipart_upload_part_request(const std::string& path, int upload_fd, off_t start, off_t size, int part_num, const std::string& upload_id, etagpair* petag, bool is_copy);
int complete_multipart_upload_request(const std::string& path, const std::string& upload_id, const etaglist_t& parts);
int abort_multipart_upload_request(const std::string& path, const std::string& upload_id);
int multipart_put_head_request(const std::string& strfrom, const std::string& strto, off_t size, const headers_t& meta);