Refactored multipart put head request

This commit is contained in:
Takeshi Nakatani 2024-07-15 06:40:05 +00:00 committed by Andrew Gaul
parent a1e47bc287
commit 86b5c9d88e
7 changed files with 297 additions and 181 deletions

View File

@ -4252,7 +4252,7 @@ int S3fsCurl::MultipartUploadPartRequest(const char* tpath, int part_num, const
return result;
}
int S3fsCurl::CopyMultipartUploadSetup(const char* from, const char* to, int part_num, const std::string& upload_id, headers_t& meta)
int S3fsCurl::CopyMultipartUploadSetup(const char* from, const char* to, int part_num, const std::string& upload_id, const headers_t& meta)
{
S3FS_PRN_INFO3("[from=%s][to=%s][part=%d]", SAFESTRPTR(from), SAFESTRPTR(to), part_num);
@ -4344,20 +4344,6 @@ bool S3fsCurl::MultipartUploadPartComplete()
return true;
}
// cppcheck-suppress unmatchedSuppression
// cppcheck-suppress constParameter
// cppcheck-suppress constParameterCallback
bool S3fsCurl::CopyMultipartUploadCallback(S3fsCurl* s3fscurl, void* param)
{
if(!s3fscurl || param){ // this callback does not need a parameter
return false;
}
// cppcheck-suppress unmatchedSuppression
// cppcheck-suppress knownConditionTrueFalse
return s3fscurl->CopyMultipartUploadComplete();
}
bool S3fsCurl::CopyMultipartUploadComplete()
{
std::string etag;
@ -4381,65 +4367,27 @@ bool S3fsCurl::MixMultipartUploadComplete()
return result;
}
int S3fsCurl::MultipartHeadRequest(const char* tpath, off_t size, headers_t& meta)
int S3fsCurl::MultipartPutHeadRequest(const std::string& from, const std::string& to, int part_number, const std::string& upload_id, const headers_t& meta)
{
int result;
std::string upload_id;
off_t chunk;
off_t bytes_remaining;
etaglist_t list;
S3FS_PRN_INFO3("[from=%s][to=%s][part_number=%d][upload_id=%s]", from.c_str(), to.c_str(), part_number, upload_id.c_str());
S3FS_PRN_INFO3("[tpath=%s]", SAFESTRPTR(tpath));
int result;
if(0 != (result = PreMultipartUploadRequest(tpath, meta, upload_id))){
// setup
if(0 != (result = CopyMultipartUploadSetup(from.c_str(), to.c_str(), part_number, upload_id, meta))){
S3FS_PRN_ERR("failed multipart put head request setup(from=%s, to=%s, part_number=%d, upload_id=%s) : %d", from.c_str(), to.c_str(), part_number, upload_id.c_str(), result);
return result;
}
DestroyCurlHandle();
// Initialize S3fsMultiCurl
S3fsMultiCurl curlmulti(GetMaxParallelCount());
curlmulti.SetSuccessCallback(S3fsCurl::CopyMultipartUploadCallback);
curlmulti.SetRetryCallback(S3fsCurl::CopyMultipartUploadRetryCallback);
for(bytes_remaining = size; 0 < bytes_remaining; bytes_remaining -= chunk){
chunk = bytes_remaining > GetMultipartCopySize() ? GetMultipartCopySize() : bytes_remaining;
std::ostringstream strrange;
strrange << "bytes=" << (size - bytes_remaining) << "-" << (size - bytes_remaining + chunk - 1);
meta["x-amz-copy-source-range"] = strrange.str();
// s3fscurl sub object
std::unique_ptr<S3fsCurl> s3fscurl_para(new S3fsCurl(true));
s3fscurl_para->b_from = SAFESTRPTR(tpath);
s3fscurl_para->b_meta = meta;
s3fscurl_para->partdata.add_etag_list(list);
// initiate upload part for parallel
if(0 != (result = s3fscurl_para->CopyMultipartUploadSetup(tpath, tpath, s3fscurl_para->partdata.get_part_number(), upload_id, meta))){
S3FS_PRN_ERR("failed uploading part setup(%d)", result);
return result;
}
// set into parallel object
if(!curlmulti.SetS3fsCurlObject(std::move(s3fscurl_para))){
S3FS_PRN_ERR("Could not make curl object into multi curl(%s).", tpath);
return -EIO;
}
if(!fpLazySetup || !fpLazySetup(this)){
S3FS_PRN_ERR("failed multipart put head request lazysetup(from=%s, to=%s, part_number=%d, upload_id=%s)", from.c_str(), to.c_str(), part_number, upload_id.c_str());
return -EIO;
}
// Multi request
if(0 != (result = curlmulti.Request())){
S3FS_PRN_ERR("error occurred in multi request(errno=%d).", result);
int result2;
if(0 != (result2 = abort_multipart_upload_request(std::string(tpath), upload_id))){
S3FS_PRN_ERR("error aborting multipart upload(errno=%d).", result2);
}
// request
if(0 != (result = RequestPerform())){
return result;
}
if(0 != (result = MultipartUploadComplete(tpath, upload_id, list))){
return result;
}
return 0;
}
@ -4466,75 +4414,6 @@ int S3fsCurl::MultipartUploadRequest(const std::string& upload_id, const char* t
return 0;
}
int S3fsCurl::MultipartRenameRequest(const char* from, const char* to, headers_t& meta, off_t size)
{
int result;
std::string upload_id;
off_t chunk;
off_t bytes_remaining;
etaglist_t list;
S3FS_PRN_INFO3("[from=%s][to=%s]", SAFESTRPTR(from), SAFESTRPTR(to));
std::string srcresource;
std::string srcurl;
MakeUrlResource(get_realpath(from).c_str(), srcresource, srcurl);
meta["Content-Type"] = S3fsCurl::LookupMimeType(to);
meta["x-amz-copy-source"] = srcresource;
if(0 != (result = PreMultipartUploadRequest(to, meta, upload_id))){
return result;
}
DestroyCurlHandle();
// Initialize S3fsMultiCurl
S3fsMultiCurl curlmulti(GetMaxParallelCount());
curlmulti.SetSuccessCallback(S3fsCurl::CopyMultipartUploadCallback);
curlmulti.SetRetryCallback(S3fsCurl::CopyMultipartUploadRetryCallback);
for(bytes_remaining = size; 0 < bytes_remaining; bytes_remaining -= chunk){
chunk = bytes_remaining > GetMultipartCopySize() ? GetMultipartCopySize() : bytes_remaining;
std::ostringstream strrange;
strrange << "bytes=" << (size - bytes_remaining) << "-" << (size - bytes_remaining + chunk - 1);
meta["x-amz-copy-source-range"] = strrange.str();
// s3fscurl sub object
std::unique_ptr<S3fsCurl> s3fscurl_para(new S3fsCurl(true));
s3fscurl_para->b_from = SAFESTRPTR(from);
s3fscurl_para->b_meta = meta;
s3fscurl_para->partdata.add_etag_list(list);
// initiate upload part for parallel
if(0 != (result = s3fscurl_para->CopyMultipartUploadSetup(from, to, s3fscurl_para->partdata.get_part_number(), upload_id, meta))){
S3FS_PRN_ERR("failed uploading part setup(%d)", result);
return result;
}
// set into parallel object
if(!curlmulti.SetS3fsCurlObject(std::move(s3fscurl_para))){
S3FS_PRN_ERR("Could not make curl object into multi curl(%s).", to);
return -EIO;
}
}
// Multi request
if(0 != (result = curlmulti.Request())){
S3FS_PRN_ERR("error occurred in multi request(errno=%d).", result);
int result2;
if(0 != (result2 = abort_multipart_upload_request(std::string(to), upload_id))){
S3FS_PRN_ERR("error aborting multipart upload(errno=%d).", result2);
}
return result;
}
if(0 != (result = MultipartUploadComplete(to, upload_id, list))){
return result;
}
return 0;
}
/*
* Local variables:
* tab-width: 4

View File

@ -196,13 +196,13 @@ class S3fsCurl
size_t b_ssekey_pos; // backup for retrying
std::string b_ssevalue; // backup for retrying
sse_type_t b_ssetype; // backup for retrying
std::string b_from; // backup for retrying(for copy request)
std::string b_from; // backup for retrying(for copy request) ([TODO] If S3fsMultiCurl is discontinued, this variable will be deleted.)
headers_t b_meta; // backup for retrying(for copy request)
std::string op; // the HTTP verb of the request ("PUT", "GET", etc.)
std::string query_string; // request query string
Semaphore *sem;
std::mutex *completed_tids_lock;
std::vector<std::thread::id> *completed_tids PT_GUARDED_BY(*completed_tids_lock);
std::mutex *completed_tids_lock; // ([TODO] If S3fsMultiCurl is discontinued, this variable will be deleted.)
std::vector<std::thread::id> *completed_tids PT_GUARDED_BY(*completed_tids_lock); // ([TODO] If S3fsMultiCurl is discontinued, this variable will be deleted.)
s3fscurl_lazy_setup fpLazySetup; // curl options for lazy setting function
CURLcode curlCode; // handle curl return
@ -241,7 +241,6 @@ class S3fsCurl
static size_t DownloadWriteCallback(void* ptr, size_t size, size_t nmemb, void* userp);
static bool MultipartUploadPartCallback(S3fsCurl* s3fscurl, void* param);
static bool CopyMultipartUploadCallback(S3fsCurl* s3fscurl, void* param);
static bool MixMultipartUploadCallback(S3fsCurl* s3fscurl, void* param);
static std::unique_ptr<S3fsCurl> MultipartUploadPartRetryCallback(S3fsCurl* s3fscurl);
static std::unique_ptr<S3fsCurl> CopyMultipartUploadRetryCallback(S3fsCurl* s3fscurl);
@ -280,7 +279,7 @@ class S3fsCurl
std::string CalcSignatureV2(const std::string& method, const std::string& strMD5, const std::string& content_type, const std::string& date, const std::string& resource, const std::string& secret_access_key, const std::string& access_token);
std::string CalcSignature(const std::string& method, const std::string& canonical_uri, const std::string& query_string, const std::string& strdate, const std::string& payload_hash, const std::string& date8601, const std::string& secret_access_key, const std::string& access_token);
int MultipartUploadPartSetup(const char* tpath, int part_num, const std::string& upload_id);
int CopyMultipartUploadSetup(const char* from, const char* to, int part_num, const std::string& upload_id, headers_t& meta);
int CopyMultipartUploadSetup(const char* from, const char* to, int part_num, const std::string& upload_id, const headers_t& meta);
bool MultipartUploadPartComplete();
bool CopyMultipartUploadComplete();
int MapPutErrorResponse(int result);
@ -385,9 +384,8 @@ class S3fsCurl
bool MixMultipartUploadComplete();
int MultipartListRequest(std::string& body);
int AbortMultipartUpload(const char* tpath, const std::string& upload_id);
int MultipartHeadRequest(const char* tpath, off_t size, headers_t& meta);
int MultipartPutHeadRequest(const std::string& from, const std::string& to, int part_number, const std::string& upload_id, const headers_t& meta);
int MultipartUploadRequest(const std::string& upload_id, const char* tpath, int fd, off_t offset, off_t size, etagpair* petagpair);
int MultipartRenameRequest(const char* from, const char* to, headers_t& meta, off_t size);
// methods(variables)
const std::string& GetPath() const { return path; }

View File

@ -24,6 +24,7 @@
#include <cstdint>
#include <curl/curl.h>
#include <string>
#include "metaheader.h"
enum class sse_type_t : uint8_t;
@ -38,6 +39,7 @@ std::string get_header_value(const struct curl_slist* list, const std::string &k
bool MakeUrlResource(const char* realpath, std::string& resourcepath, std::string& url);
std::string prepare_url(const char* url);
bool get_object_sse_type(const char* path, sse_type_t& ssetype, std::string& ssevalue); // implement in s3fs.cpp
int put_headers(const char* path, const headers_t& meta, bool is_copy, bool use_st_size = true); // implement in s3fs.cpp
bool make_md5_from_binary(const char* pstr, size_t length, std::string& md5);
std::string url_to_host(const std::string &url);

View File

@ -2513,9 +2513,6 @@ bool FdEntity::MergeOrgMeta(headers_t& updatemeta)
return (pending_status_t::NO_UPDATE_PENDING != pending_status);
}
// global function in s3fs.cpp
int put_headers(const char* path, headers_t& meta, bool is_copy, bool use_st_size = true);
int FdEntity::UploadPendingHasLock(int fd)
{
int result;

View File

@ -44,6 +44,7 @@
#include "fdcache_stat.h"
#include "curl.h"
#include "curl_multi.h"
#include "curl_util.h"
#include "s3objlist.h"
#include "cache.h"
#include "addhead.h"
@ -110,11 +111,6 @@ static bool update_parent_dir_stat= false; // default not updating parent direc
static fsblkcnt_t bucket_block_count; // advertised block count of the bucket
static unsigned long s3fs_block_size = 16 * 1024 * 1024; // s3fs block size is 16MB
//-------------------------------------------------------------------
// Global functions : prototype
//-------------------------------------------------------------------
int put_headers(const char* path, headers_t& meta, bool is_copy, bool use_st_size = true); // [NOTE] global function because this is called from FdEntity class
//-------------------------------------------------------------------
// Static functions : prototype
//-------------------------------------------------------------------
@ -849,7 +845,7 @@ static int get_local_fent(AutoFdEntity& autoent, FdEntity **entity, const char*
// create or update s3 meta
// @return fuse return code
//
int put_headers(const char* path, headers_t& meta, bool is_copy, bool use_st_size)
int put_headers(const char* path, const headers_t& meta, bool is_copy, bool use_st_size)
{
int result;
off_t size;
@ -876,11 +872,7 @@ int put_headers(const char* path, headers_t& meta, bool is_copy, bool use_st_siz
}
if(!nocopyapi && !nomultipart && size >= multipart_threshold){
// [TODO]
// This object will be removed after removing S3fsMultiCurl
//
S3fsCurl s3fscurl(true);
if(0 != (result = s3fscurl.MultipartHeadRequest(strpath.c_str(), size, meta))){
if(0 != (result = multipart_put_head_request(strpath, strpath, size, meta))){
return result;
}
}else{
@ -1560,14 +1552,9 @@ static int rename_large_object(const char* from, const char* to)
return result;
}
// [TODO]
// This object will be removed after removing S3fsMultiCurl
//
S3fsCurl s3fscurl(true);
if(0 != (result = s3fscurl.MultipartRenameRequest(from, to, meta, buf.st_size))){
if(0 != (result = multipart_put_head_request(std::string(from), std::string(to), buf.st_size, meta))){
return result;
}
s3fscurl.DestroyCurlHandle();
// Rename cache file
FdManager::get()->Rename(from, to);
@ -3124,27 +3111,10 @@ static int readdir_multi_head(const char* path, const S3ObjList& head, void* buf
continue;
}
// parameter for thread worker
auto* thargs = new multi_head_req_thparam; // free in multi_head_req_threadworker
thargs->psyncfiller = &syncfiller;
thargs->pthparam_lock = &thparam_lock; // for pretrycount and presult member
thargs->pretrycount = &retrycount;
thargs->pnotfound_list = &notfound_list;
thargs->use_wtf8 = use_wtf8;
thargs->path = disppath;
thargs->presult = &req_result;
// make parameter for thread pool
thpoolman_param ppoolparam;
ppoolparam.args = thargs;
ppoolparam.psem = &multi_head_sem;
ppoolparam.pfunc = multi_head_req_threadworker;
// setup instruction
if(!ThreadPoolMan::Instruct(ppoolparam)){
S3FS_PRN_ERR("failed setup instruction for one header request.");
delete thargs;
return -EIO;
// set one head request
int result;
if(0 != (result = multi_head_request(disppath, syncfiller, thparam_lock, retrycount, notfound_list, use_wtf8, req_result, multi_head_sem))){
return result;
}
++req_count;
}

View File

@ -18,6 +18,8 @@
* Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA.
*/
#include <sstream>
#include "common.h"
#include "s3fs.h"
#include "s3fs_threadreqs.h"
@ -354,6 +356,123 @@ void* get_object_req_threadworker(void* arg)
return reinterpret_cast<void*>(pthparam->result);
}
//
// Thread Worker function for multipart put head request
//
void* multipart_put_head_req_threadworker(void* arg)
{
auto* pthparam = static_cast<multipart_put_head_req_thparam*>(arg);
if(!pthparam || !pthparam->ppartdata || !pthparam->pthparam_lock || !pthparam->pretrycount || !pthparam->presult){
return reinterpret_cast<void*>(-EIO);
}
// Check retry max count and print debug message
{
const std::lock_guard<std::mutex> lock(*(pthparam->pthparam_lock));
S3FS_PRN_INFO3("Multipart Put Head Request [from=%s][to=%s][upload_id=%s][part_number=%d][filepart=%p][thparam_lock=%p][retrycount=%d][from=%s][to=%s]", pthparam->from.c_str(), pthparam->to.c_str(), pthparam->upload_id.c_str(), pthparam->part_number, pthparam->ppartdata, pthparam->pthparam_lock, *(pthparam->pretrycount), pthparam->from.c_str(), pthparam->to.c_str());
if(S3fsCurl::GetRetries() < *(pthparam->pretrycount)){
S3FS_PRN_ERR("Multipart Put Head request(%s->%s) reached the maximum number of retry count(%d).", pthparam->from.c_str(), pthparam->to.c_str(), *(pthparam->pretrycount));
return reinterpret_cast<void*>(-EIO);
}
}
S3fsCurl s3fscurl(true);
int result = 0;
while(true){
// Request
result = s3fscurl.MultipartPutHeadRequest(pthparam->from, pthparam->to, pthparam->part_number, pthparam->upload_id, pthparam->meta);
// Check result
bool isResetOffset= true;
CURLcode curlCode = s3fscurl.GetCurlCode();
long responseCode = S3fsCurl::S3FSCURL_RESPONSECODE_NOTSET;
s3fscurl.GetResponseCode(responseCode, false);
if(CURLE_OK == curlCode){
if(responseCode < 400){
// add into stat cache
{
const std::lock_guard<std::mutex> lock(*(pthparam->pthparam_lock));
std::string etag;
pthparam->ppartdata->uploaded = simple_parse_xml(s3fscurl.GetBodyData().c_str(), s3fscurl.GetBodyData().size(), "ETag", etag);
pthparam->ppartdata->petag->etag = peeloff(etag);
}
result = 0;
break;
}else if(responseCode == 400){
// as possibly in multipart
S3FS_PRN_WARN("Put Head Request(%s->%s) got 400 response code.", pthparam->from.c_str(), pthparam->to.c_str());
}else if(responseCode == 404){
// set path to not found list
S3FS_PRN_WARN("Put Head Request(%s->%s) got 404 response code.", pthparam->from.c_str(), pthparam->to.c_str());
break;
}else if(responseCode == 500){
// case of all other result, do retry.(11/13/2013)
// because it was found that s3fs got 500 error from S3, but could success
// to retry it.
S3FS_PRN_WARN("Put Head Request(%s->%s) got 500 response code.", pthparam->from.c_str(), pthparam->to.c_str());
// cppcheck-suppress unmatchedSuppression
// cppcheck-suppress knownConditionTrueFalse
}else if(responseCode == S3fsCurl::S3FSCURL_RESPONSECODE_NOTSET){
// This is a case where the processing result has not yet been updated (should be very rare).
S3FS_PRN_WARN("Put Head Request(%s->%s) could not get any response code.", pthparam->from.c_str(), pthparam->to.c_str());
}else{ // including S3fsCurl::S3FSCURL_RESPONSECODE_FATAL_ERROR
// Retry in other case.
S3FS_PRN_WARN("Put Head Request(%s->%s) got fatal response code.", pthparam->from.c_str(), pthparam->to.c_str());
}
}else if(CURLE_OPERATION_TIMEDOUT == curlCode){
S3FS_PRN_ERR("Put Head Request(%s->%s) is timeouted.", pthparam->from.c_str(), pthparam->to.c_str());
isResetOffset= false;
}else if(CURLE_PARTIAL_FILE == curlCode){
S3FS_PRN_WARN("Put Head Request(%s->%s) is recieved data does not match the given size.", pthparam->from.c_str(), pthparam->to.c_str());
isResetOffset= false;
}else{
S3FS_PRN_WARN("Put Head Request(%s->%s) got the result code(%d: %s)", pthparam->from.c_str(), pthparam->to.c_str(), curlCode, curl_easy_strerror(curlCode));
}
// Check retry max count
{
const std::lock_guard<std::mutex> lock(*(pthparam->pthparam_lock));
++(*(pthparam->pretrycount));
if(S3fsCurl::GetRetries() < *(pthparam->pretrycount)){
S3FS_PRN_ERR("Put Head Request(%s->%s) reached the maximum number of retry count(%d).", pthparam->from.c_str(), pthparam->to.c_str(), *(pthparam->pretrycount));
if(0 == result){
result = -EIO;
}
break;
}
}
// Setup for retry
if(isResetOffset){
S3fsCurl::ResetOffset(&s3fscurl);
}
}
// Set result code
{
const std::lock_guard<std::mutex> lock(*(pthparam->pthparam_lock));
if(0 == *(pthparam->presult) && 0 != result){
// keep first error
*(pthparam->presult) = result;
}
}
return reinterpret_cast<void*>(result);
}
//-------------------------------------------------------------------
// Utility functions
//-------------------------------------------------------------------
@ -387,6 +506,36 @@ int head_request(const std::string& strpath, headers_t& header)
return 0;
}
//
// Calls S3fsCurl::HeadRequest via multi_head_req_threadworker
//
int multi_head_request(const std::string& strpath, SyncFiller& syncfiller, std::mutex& thparam_lock, int& retrycount, s3obj_list_t& notfound_list, bool use_wtf8, int& result, Semaphore& sem)
{
// parameter for thread worker
auto* thargs = new multi_head_req_thparam; // free in multi_head_req_threadworker
thargs->path = strpath;
thargs->psyncfiller = &syncfiller;
thargs->pthparam_lock = &thparam_lock; // for pretrycount and presult member
thargs->pretrycount = &retrycount;
thargs->pnotfound_list = &notfound_list;
thargs->use_wtf8 = use_wtf8;
thargs->presult = &result;
// make parameter for thread pool
thpoolman_param ppoolparam;
ppoolparam.args = thargs;
ppoolparam.psem = &sem;
ppoolparam.pfunc = multi_head_req_threadworker;
// setup instruction
if(!ThreadPoolMan::Instruct(ppoolparam)){
S3FS_PRN_ERR("failed to setup Multi Head Request Thread Worker [path=%s]", strpath.c_str());
delete thargs;
return -EIO;
}
return 0;
}
//
// Calls S3fsCurl::DeleteRequest via delete_req_threadworker
//
@ -633,6 +782,107 @@ int abort_multipart_upload_request(const std::string& path, const std::string& u
return 0;
}
//
// Calls S3fsCurl::MultipartPutHeadRequest via multipart_put_head_req_threadworker
//
int multipart_put_head_request(const std::string& strfrom, const std::string& strto, off_t size, const headers_t& meta)
{
S3FS_PRN_INFO3("[from=%s][to=%s]", strfrom.c_str(), strto.c_str());
bool is_rename = (strfrom != strto);
int result;
std::string upload_id;
off_t chunk;
off_t bytes_remaining;
etaglist_t list;
// Prepare additional header information for rename
std::string contenttype;
std::string srcresource;
if(is_rename){
std::string srcurl; // this is not used
MakeUrlResource(get_realpath(strfrom.c_str()).c_str(), srcresource, srcurl);
contenttype = S3fsCurl::LookupMimeType(strto);
}
// get upload_id
if(0 != (result = pre_multipart_upload_request(strto, meta, upload_id))){
return result;
}
// common variables
Semaphore multi_head_sem(0);
std::mutex thparam_lock;
filepart partdata;
int req_count = 0;
int retrycount = 0;
int req_result = 0;
for(bytes_remaining = size; 0 < bytes_remaining; bytes_remaining -= chunk){
chunk = bytes_remaining > S3fsCurl::GetMultipartCopySize() ? S3fsCurl::GetMultipartCopySize() : bytes_remaining;
partdata.add_etag_list(list);
// parameter for thread worker
auto* thargs = new multipart_put_head_req_thparam; // free in multipart_put_head_req_threadworker
thargs->from = strfrom;
thargs->to = strto;
thargs->upload_id = upload_id;
thargs->part_number = partdata.get_part_number();
thargs->meta = meta;
thargs->pthparam_lock = &thparam_lock;
thargs->ppartdata = &partdata;
thargs->pretrycount = &retrycount;
thargs->presult = &req_result;
std::ostringstream strrange;
strrange << "bytes=" << (size - bytes_remaining) << "-" << (size - bytes_remaining + chunk - 1);
thargs->meta["x-amz-copy-source-range"] = strrange.str();
if(is_rename){
thargs->meta["Content-Type"] = contenttype;
thargs->meta["x-amz-copy-source"] = srcresource;
}
// make parameter for thread pool
thpoolman_param ppoolparam;
ppoolparam.args = thargs;
ppoolparam.psem = &multi_head_sem;
ppoolparam.pfunc = multipart_put_head_req_threadworker;
// setup instruction
if(!ThreadPoolMan::Instruct(ppoolparam)){
S3FS_PRN_ERR("failed setup instruction for one header request.");
delete thargs;
return -EIO;
}
++req_count;
}
// wait for finish all requests
while(req_count > 0){
multi_head_sem.acquire();
--req_count;
}
// check result
if(0 != req_result){
S3FS_PRN_ERR("error occurred in multi request(errno=%d).", req_result);
int result2;
if(0 != (result2 = abort_multipart_upload_request(strto, upload_id))){
S3FS_PRN_ERR("error aborting multipart upload(errno=%d).", result2);
}
return req_result;
}
// completion process
if(0 != (result = complete_multipart_upload_request(strto, upload_id, list))){
return result;
}
return 0;
}
//
// Calls S3fsCurl::GetObjectRequest via get_object_req_threadworker
//

View File

@ -28,6 +28,7 @@
#include "curl.h"
#include "s3objlist.h"
#include "syncfiller.h"
#include "psemaphore.h"
//-------------------------------------------------------------------
// Structures for MultiThread Request
@ -144,6 +145,22 @@ struct abort_multipart_upload_req_thparam
int result = 0;
};
//
// Multipart Put Head Request parameter structure for Thread Pool.
//
struct multipart_put_head_req_thparam
{
std::string from;
std::string to;
std::string upload_id;
int part_number = 0;
headers_t meta;
std::mutex* pthparam_lock = nullptr;
filepart* ppartdata = nullptr;
int* pretrycount = nullptr;
int* presult = nullptr;
};
//
// Get Object Request parameter structure for Thread Pool.
//
@ -169,12 +186,14 @@ void* check_service_req_threadworker(void* arg);
void* pre_multipart_upload_req_threadworker(void* arg);
void* complete_multipart_upload_threadworker(void* arg);
void* abort_multipart_upload_req_threadworker(void* arg);
void* multipart_put_head_req_threadworker(void* arg);
void* get_object_req_threadworker(void* arg);
//-------------------------------------------------------------------
// Utility functions
//-------------------------------------------------------------------
int head_request(const std::string& strpath, headers_t& header);
int multi_head_request(const std::string& strpath, SyncFiller& syncfiller, std::mutex& thparam_lock, int& retrycount, s3obj_list_t& notfound_list, bool use_wtf8, int& result, Semaphore& sem);
int delete_request(const std::string& strpath);
int put_head_request(const std::string& strpath, const headers_t& meta, bool is_copy);
int put_request(const std::string& strpath, const headers_t& meta, int fd, bool ahbe);
@ -183,6 +202,7 @@ int check_service_request(const std::string& strpath, bool forceNoSSE, bool supp
int pre_multipart_upload_request(const std::string& path, const headers_t& meta, std::string& upload_id);
int complete_multipart_upload_request(const std::string& path, const std::string& upload_id, const etaglist_t& parts);
int abort_multipart_upload_request(const std::string& path, const std::string& upload_id);
int multipart_put_head_request(const std::string& strfrom, const std::string& strto, off_t size, const headers_t& meta);
int get_object_request(const std::string& path, int fd, off_t start, off_t size);
//-------------------------------------------------------------------