Removed last use of S3fsMultiCurl and changed those thread handling

This commit is contained in:
Takeshi Nakatani 2024-07-15 06:40:05 +00:00 committed by Andrew Gaul
parent 499577c2a9
commit bfc3ea767a
9 changed files with 222 additions and 827 deletions

View File

@ -36,7 +36,6 @@ s3fs_SOURCES = \
metaheader.cpp \
mpu_util.cpp \
curl.cpp \
curl_multi.cpp \
curl_util.cpp \
s3objlist.cpp \
cache.cpp \

View File

@ -35,7 +35,6 @@
#include "s3fs.h"
#include "s3fs_logger.h"
#include "curl.h"
#include "curl_multi.h"
#include "curl_util.h"
#include "s3fs_auth.h"
#include "s3fs_cred.h"
@ -1219,132 +1218,6 @@ bool S3fsCurl::SetIPResolveType(const char* value)
return true;
}
// cppcheck-suppress unmatchedSuppression
// cppcheck-suppress constParameter
// cppcheck-suppress constParameterCallback
bool S3fsCurl::MultipartUploadPartCallback(S3fsCurl* s3fscurl, void* param)
{
if(!s3fscurl || param){ // this callback does not need a parameter
return false;
}
return s3fscurl->MultipartUploadPartComplete();
}
// cppcheck-suppress unmatchedSuppression
// cppcheck-suppress constParameter
// cppcheck-suppress constParameterCallback
bool S3fsCurl::MixMultipartUploadCallback(S3fsCurl* s3fscurl, void* param)
{
if(!s3fscurl || param){ // this callback does not need a parameter
return false;
}
return s3fscurl->MultipartUploadPartComplete();
}
std::unique_ptr<S3fsCurl> S3fsCurl::MultipartUploadPartRetryCallback(S3fsCurl* s3fscurl)
{
if(!s3fscurl){
return nullptr;
}
// parse and get part_num, upload_id.
std::string upload_id;
std::string part_num_str;
int part_num;
off_t tmp_part_num = 0;
if(!get_keyword_value(s3fscurl->url, "uploadId", upload_id)){
return nullptr;
}
upload_id = urlDecode(upload_id); // decode
if(!get_keyword_value(s3fscurl->url, "partNumber", part_num_str)){
return nullptr;
}
if(!s3fs_strtoofft(&tmp_part_num, part_num_str.c_str(), /*base=*/ 10)){
return nullptr;
}
part_num = static_cast<int>(tmp_part_num);
if(s3fscurl->retry_count >= S3fsCurl::retries){
S3FS_PRN_ERR("Over retry count(%d) limit(%s:%d).", s3fscurl->retry_count, s3fscurl->path.c_str(), part_num);
return nullptr;
}
// duplicate request
std::unique_ptr<S3fsCurl> newcurl(new S3fsCurl(s3fscurl->IsUseAhbe()));
newcurl->partdata.petag = s3fscurl->partdata.petag;
newcurl->partdata.fd = s3fscurl->partdata.fd;
newcurl->partdata.startpos = s3fscurl->b_partdata_startpos;
newcurl->partdata.size = s3fscurl->b_partdata_size;
newcurl->b_partdata_startpos = s3fscurl->b_partdata_startpos;
newcurl->b_partdata_size = s3fscurl->b_partdata_size;
newcurl->retry_count = s3fscurl->retry_count + 1;
newcurl->op = s3fscurl->op;
newcurl->type = s3fscurl->type;
// setup new curl object
if(0 != newcurl->MultipartUploadContentPartSetup(s3fscurl->path.c_str(), part_num, upload_id)){
S3FS_PRN_ERR("Could not duplicate curl object(%s:%d).", s3fscurl->path.c_str(), part_num);
return nullptr;
}
return newcurl;
}
std::unique_ptr<S3fsCurl> S3fsCurl::CopyMultipartUploadRetryCallback(S3fsCurl* s3fscurl)
{
if(!s3fscurl){
return nullptr;
}
// parse and get part_num, upload_id.
std::string upload_id;
std::string part_num_str;
int part_num;
off_t tmp_part_num = 0;
if(!get_keyword_value(s3fscurl->url, "uploadId", upload_id)){
return nullptr;
}
upload_id = urlDecode(upload_id); // decode
if(!get_keyword_value(s3fscurl->url, "partNumber", part_num_str)){
return nullptr;
}
if(!s3fs_strtoofft(&tmp_part_num, part_num_str.c_str(), /*base=*/ 10)){
return nullptr;
}
part_num = static_cast<int>(tmp_part_num);
if(s3fscurl->retry_count >= S3fsCurl::retries){
S3FS_PRN_ERR("Over retry count(%d) limit(%s:%d).", s3fscurl->retry_count, s3fscurl->path.c_str(), part_num);
return nullptr;
}
// duplicate request
std::unique_ptr<S3fsCurl> newcurl(new S3fsCurl(s3fscurl->IsUseAhbe()));
newcurl->partdata.petag = s3fscurl->partdata.petag;
newcurl->b_from = s3fscurl->b_from;
newcurl->b_meta = s3fscurl->b_meta;
newcurl->retry_count = s3fscurl->retry_count + 1;
newcurl->op = s3fscurl->op;
newcurl->type = s3fscurl->type;
// setup new curl object
if(0 != newcurl->MultipartUploadCopyPartSetup(s3fscurl->b_from.c_str(), s3fscurl->path.c_str(), part_num, upload_id, s3fscurl->b_meta)){
S3FS_PRN_ERR("Could not duplicate curl object(%s:%d).", s3fscurl->path.c_str(), part_num);
return nullptr;
}
return newcurl;
}
std::unique_ptr<S3fsCurl> S3fsCurl::MixMultipartUploadRetryCallback(S3fsCurl* s3fscurl)
{
if(!s3fscurl){
return nullptr;
}
if(-1 == s3fscurl->partdata.fd){
return S3fsCurl::CopyMultipartUploadRetryCallback(s3fscurl);
}else{
return S3fsCurl::MultipartUploadPartRetryCallback(s3fscurl);
}
}
int S3fsCurl::MapPutErrorResponse(int result)
{
if(result != 0){
@ -1372,185 +1245,6 @@ int S3fsCurl::MapPutErrorResponse(int result)
return result;
}
int S3fsCurl::ParallelMultipartUploadRequest(const char* tpath, const headers_t& meta, int fd)
{
int result;
std::string upload_id;
struct stat st;
etaglist_t list;
off_t remaining_bytes;
S3FS_PRN_INFO3("[tpath=%s][fd=%d]", SAFESTRPTR(tpath), fd);
if(-1 == fstat(fd, &st)){
S3FS_PRN_ERR("Invalid file descriptor(errno=%d)", errno);
return -errno;
}
if(0 != (result = pre_multipart_upload_request(std::string(tpath), meta, upload_id))){
return result;
}
// Initialize S3fsMultiCurl
S3fsMultiCurl curlmulti(GetMaxParallelCount());
curlmulti.SetSuccessCallback(S3fsCurl::MultipartUploadPartCallback);
curlmulti.SetRetryCallback(S3fsCurl::MultipartUploadPartRetryCallback);
// cycle through open fd, pulling off 10MB chunks at a time
for(remaining_bytes = st.st_size; 0 < remaining_bytes; ){
off_t chunk = remaining_bytes > S3fsCurl::multipart_size ? S3fsCurl::multipart_size : remaining_bytes;
// s3fscurl sub object
std::unique_ptr<S3fsCurl> s3fscurl_para(new S3fsCurl(true));
s3fscurl_para->partdata.fd = fd;
s3fscurl_para->partdata.startpos = st.st_size - remaining_bytes;
s3fscurl_para->partdata.size = chunk;
s3fscurl_para->b_partdata_startpos = s3fscurl_para->partdata.startpos;
s3fscurl_para->b_partdata_size = s3fscurl_para->partdata.size;
s3fscurl_para->partdata.add_etag_list(list);
// initiate upload part for parallel
if(0 != (result = s3fscurl_para->MultipartUploadContentPartSetup(tpath, s3fscurl_para->partdata.get_part_number(), upload_id))){
S3FS_PRN_ERR("failed uploading part setup(%d)", result);
return result;
}
// set into parallel object
if(!curlmulti.SetS3fsCurlObject(std::move(s3fscurl_para))){
S3FS_PRN_ERR("Could not make curl object into multi curl(%s).", tpath);
return -EIO;
}
remaining_bytes -= chunk;
}
// Multi request
if(0 != (result = curlmulti.Request())){
S3FS_PRN_ERR("error occurred in multi request(errno=%d).", result);
int result2;
if(0 != (result2 = abort_multipart_upload_request(std::string(tpath), upload_id))){
S3FS_PRN_ERR("error aborting multipart upload(errno=%d).", result2);
}
return result;
}
if(0 != (result = complete_multipart_upload_request(std::string(tpath), upload_id, list))){
return result;
}
return 0;
}
int S3fsCurl::ParallelMixMultipartUploadRequest(const char* tpath, headers_t& meta, int fd, const fdpage_list_t& mixuppages)
{
int result;
std::string upload_id;
struct stat st;
etaglist_t list;
S3FS_PRN_INFO3("[tpath=%s][fd=%d]", SAFESTRPTR(tpath), fd);
if(-1 == fstat(fd, &st)){
S3FS_PRN_ERR("Invalid file descriptor(errno=%d)", errno);
return -errno;
}
if(0 != (result = pre_multipart_upload_request(std::string(tpath), meta, upload_id))){
return result;
}
// for copy multipart
std::string srcresource;
std::string srcurl;
MakeUrlResource(get_realpath(tpath).c_str(), srcresource, srcurl);
meta["Content-Type"] = S3fsCurl::LookupMimeType(tpath);
meta["x-amz-copy-source"] = srcresource;
// Initialize S3fsMultiCurl
S3fsMultiCurl curlmulti(GetMaxParallelCount());
curlmulti.SetSuccessCallback(S3fsCurl::MixMultipartUploadCallback);
curlmulti.SetRetryCallback(S3fsCurl::MixMultipartUploadRetryCallback);
for(auto iter = mixuppages.cbegin(); iter != mixuppages.cend(); ++iter){
if(iter->modified){
// Multipart upload
std::unique_ptr<S3fsCurl> s3fscurl_para(new S3fsCurl(true));
s3fscurl_para->partdata.fd = fd;
s3fscurl_para->partdata.startpos = iter->offset;
s3fscurl_para->partdata.size = iter->bytes;
s3fscurl_para->b_partdata_startpos = s3fscurl_para->partdata.startpos;
s3fscurl_para->b_partdata_size = s3fscurl_para->partdata.size;
s3fscurl_para->partdata.add_etag_list(list);
S3FS_PRN_INFO3("Upload Part [tpath=%s][start=%lld][size=%lld][part=%d]", SAFESTRPTR(tpath), static_cast<long long>(iter->offset), static_cast<long long>(iter->bytes), s3fscurl_para->partdata.get_part_number());
// initiate upload part for parallel
if(0 != (result = s3fscurl_para->MultipartUploadContentPartSetup(tpath, s3fscurl_para->partdata.get_part_number(), upload_id))){
S3FS_PRN_ERR("failed uploading part setup(%d)", result);
return result;
}
// set into parallel object
if(!curlmulti.SetS3fsCurlObject(std::move(s3fscurl_para))){
S3FS_PRN_ERR("Could not make curl object into multi curl(%s).", tpath);
return -EIO;
}
}else{
// Multipart copy
for(off_t i = 0, bytes = 0; i < iter->bytes; i += bytes){
std::unique_ptr<S3fsCurl> s3fscurl_para(new S3fsCurl(true));
bytes = std::min(GetMultipartCopySize(), iter->bytes - i);
/* every part should be larger than MIN_MULTIPART_SIZE and smaller than FIVE_GB */
off_t remain_bytes = iter->bytes - i - bytes;
if ((MIN_MULTIPART_SIZE > remain_bytes) && (0 < remain_bytes)){
if(FIVE_GB < (bytes + remain_bytes)){
bytes = (bytes + remain_bytes)/2;
} else{
bytes += remain_bytes;
}
}
std::ostringstream strrange;
strrange << "bytes=" << (iter->offset + i) << "-" << (iter->offset + i + bytes - 1);
meta["x-amz-copy-source-range"] = strrange.str();
s3fscurl_para->b_from = SAFESTRPTR(tpath);
s3fscurl_para->b_meta = meta;
s3fscurl_para->partdata.add_etag_list(list);
S3FS_PRN_INFO3("Copy Part [tpath=%s][start=%lld][size=%lld][part=%d]", SAFESTRPTR(tpath), static_cast<long long>(iter->offset + i), static_cast<long long>(bytes), s3fscurl_para->partdata.get_part_number());
// initiate upload part for parallel
if(0 != (result = s3fscurl_para->MultipartUploadCopyPartSetup(tpath, tpath, s3fscurl_para->partdata.get_part_number(), upload_id, meta))){
S3FS_PRN_ERR("failed uploading part setup(%d)", result);
return result;
}
// set into parallel object
if(!curlmulti.SetS3fsCurlObject(std::move(s3fscurl_para))){
S3FS_PRN_ERR("Could not make curl object into multi curl(%s).", tpath);
return -EIO;
}
}
}
}
// Multi request
if(0 != (result = curlmulti.Request())){
S3FS_PRN_ERR("error occurred in multi request(errno=%d).", result);
int result2;
if(0 != (result2 = abort_multipart_upload_request(std::string(tpath), upload_id))){
S3FS_PRN_ERR("error aborting multipart upload(errno=%d).", result2);
}
return result;
}
if(0 != (result = complete_multipart_upload_request(std::string(tpath), upload_id, list))){
return result;
}
return 0;
}
bool S3fsCurl::MultipartUploadPartSetCurlOpts(S3fsCurl* s3fscurl)
{
if(!s3fscurl){
@ -1794,8 +1488,7 @@ S3fsCurl::S3fsCurl(bool ahbe) :
type(REQTYPE::UNSET), requestHeaders(nullptr),
LastResponseCode(S3FSCURL_RESPONSECODE_NOTSET), postdata(nullptr), postdata_remaining(0), is_use_ahbe(ahbe),
retry_count(0), b_postdata(nullptr), b_postdata_remaining(0), b_partdata_startpos(0), b_partdata_size(0),
b_ssekey_pos(-1), b_ssetype(sse_type_t::SSE_DISABLE),
sem(nullptr), completed_tids_lock(nullptr), completed_tids(nullptr), fpLazySetup(nullptr), curlCode(CURLE_OK)
fpLazySetup(nullptr), curlCode(CURLE_OK)
{
if(!S3fsCurl::ps3fscred){
S3FS_PRN_CRIT("The object of S3fs Credential class is not initialized.");
@ -2078,7 +1771,7 @@ bool S3fsCurl::RemakeHandle()
headdata.clear();
LastResponseCode = S3FSCURL_RESPONSECODE_NOTSET;
// count up(only use for multipart)
// retry count up
retry_count++;
// set from backup
@ -3194,7 +2887,6 @@ bool S3fsCurl::PreHeadRequest(const char* tpath, size_t ssekey_pos)
return false;
}
}
b_ssekey_pos = ssekey_pos;
op = "HEAD";
type = REQTYPE::HEAD;
@ -3533,9 +3225,6 @@ int S3fsCurl::PreGetObjectRequest(const char* tpath, int fd, off_t start, off_t
partdata.size = size;
b_partdata_startpos = start;
b_partdata_size = size;
b_ssetype = ssetype;
b_ssevalue = ssevalue;
b_ssekey_pos = -1; // not use this value for get object.
return 0;
}
@ -3849,8 +3538,6 @@ int S3fsCurl::MultipartUploadPartSetup(const char* tpath, int upload_fd, off_t s
strrange << "bytes=" << start << "-" << (start + size - 1);
meta["x-amz-copy-source-range"] = strrange.str();
b_from = SAFESTRPTR(tpath);
b_meta = meta;
partdata.set_etag(petag); // [NOTE] be careful, the value is set directly
S3FS_PRN_INFO3("Copy Part [path=%s][start=%lld][size=%lld][part=%d]", SAFESTRPTR(tpath), static_cast<long long int>(start), static_cast<long long int>(size), part_num);

View File

@ -34,7 +34,6 @@
#include "common.h"
#include "fdcache_page.h"
#include "metaheader.h"
#include "psemaphore.h"
#include "s3fs_util.h"
#include "types.h"
@ -97,10 +96,6 @@ typedef std::vector<sseckeymap_t> sseckeylist_t;
//
class S3fsCurl
{
// [TODO]
// If S3fsMultiCurl is discontinued, the following friends will be deleted.
friend class S3fsMultiCurl;
private:
enum class REQTYPE : int8_t {
UNSET = -1,
@ -187,22 +182,14 @@ class S3fsCurl
off_t postdata_remaining; // use by post method and read callback function.
filepart partdata; // use by multipart upload/get object callback
bool is_use_ahbe; // additional header by extension
int retry_count; // retry count for multipart ([TODO] If S3fsMultiCurl is discontinued, this variable will be deleted.)
int retry_count; // retry count, this is used only sleep time before retring
std::unique_ptr<FILE, decltype(&s3fs_fclose)> b_infile = {nullptr, &s3fs_fclose}; // backup for retrying
const unsigned char* b_postdata; // backup for retrying
off_t b_postdata_remaining; // backup for retrying
off_t b_partdata_startpos; // backup for retrying
off_t b_partdata_size; // backup for retrying
size_t b_ssekey_pos; // backup for retrying
std::string b_ssevalue; // backup for retrying
sse_type_t b_ssetype; // backup for retrying
std::string b_from; // backup for retrying(for copy request) ([TODO] If S3fsMultiCurl is discontinued, this variable will be deleted.)
headers_t b_meta; // backup for retrying(for copy request)
std::string op; // the HTTP verb of the request ("PUT", "GET", etc.)
std::string query_string; // request query string
Semaphore *sem;
std::mutex *completed_tids_lock; // ([TODO] If S3fsMultiCurl is discontinued, this variable will be deleted.)
std::vector<std::thread::id> *completed_tids PT_GUARDED_BY(*completed_tids_lock); // ([TODO] If S3fsMultiCurl is discontinued, this variable will be deleted.)
s3fscurl_lazy_setup fpLazySetup; // curl options for lazy setting function
CURLcode curlCode; // handle curl return
@ -240,12 +227,6 @@ class S3fsCurl
static size_t UploadReadCallback(void *ptr, size_t size, size_t nmemb, void *userp);
static size_t DownloadWriteCallback(void* ptr, size_t size, size_t nmemb, void* userp);
static bool MultipartUploadPartCallback(S3fsCurl* s3fscurl, void* param);
static bool MixMultipartUploadCallback(S3fsCurl* s3fscurl, void* param);
static std::unique_ptr<S3fsCurl> MultipartUploadPartRetryCallback(S3fsCurl* s3fscurl);
static std::unique_ptr<S3fsCurl> CopyMultipartUploadRetryCallback(S3fsCurl* s3fscurl);
static std::unique_ptr<S3fsCurl> MixMultipartUploadRetryCallback(S3fsCurl* s3fscurl);
// lazy functions for set curl options
static bool MultipartUploadPartSetCurlOpts(S3fsCurl* s3fscurl);
static bool CopyMultipartUploadSetCurlOpts(S3fsCurl* s3fscurl);
@ -289,8 +270,6 @@ class S3fsCurl
static bool InitCredentialObject(S3fsCred* pcredobj);
static bool InitMimeType(const std::string& strFile);
static bool DestroyS3fsCurl();
static int ParallelMultipartUploadRequest(const char* tpath, const headers_t& meta, int fd);
static int ParallelMixMultipartUploadRequest(const char* tpath, headers_t& meta, int fd, const fdpage_list_t& mixuppages);
// class methods(variables)
static std::string LookupMimeType(const std::string& name);

View File

@ -1,386 +0,0 @@
/*
* s3fs - FUSE-based file system backed by Amazon S3
*
* Copyright(C) 2007 Randy Rizun <rrizun@gmail.com>
*
* This program is free software; you can redistribute it and/or
* modify it under the terms of the GNU General Public License
* as published by the Free Software Foundation; either version 2
* of the License, or (at your option) any later version.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU General Public License for more details.
*
* You should have received a copy of the GNU General Public License
* along with this program; if not, write to the Free Software
* Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA.
*/
#include <cstdio>
#include <cstdlib>
#include <cerrno>
#include <future>
#include <mutex>
#include <thread>
#include <utility>
#include <vector>
#include "s3fs.h"
#include "s3fs_logger.h"
#include "curl_multi.h"
#include "curl.h"
#include "psemaphore.h"
//-------------------------------------------------------------------
// Class S3fsMultiCurl
//-------------------------------------------------------------------
S3fsMultiCurl::S3fsMultiCurl(int maxParallelism, bool not_abort) : maxParallelism(maxParallelism), not_abort(not_abort), SuccessCallback(nullptr), NotFoundCallback(nullptr), RetryCallback(nullptr), pSuccessCallbackParam(nullptr), pNotFoundCallbackParam(nullptr)
{
}
S3fsMultiCurl::~S3fsMultiCurl()
{
Clear();
}
bool S3fsMultiCurl::ClearEx(bool is_all)
{
s3fscurllist_t::const_iterator iter;
for(iter = clist_req.cbegin(); iter != clist_req.cend(); ++iter){
S3fsCurl* s3fscurl = iter->get();
if(s3fscurl){
s3fscurl->DestroyCurlHandle();
}
}
clist_req.clear();
if(is_all){
for(iter = clist_all.cbegin(); iter != clist_all.cend(); ++iter){
S3fsCurl* s3fscurl = iter->get();
s3fscurl->DestroyCurlHandle();
}
clist_all.clear();
}
S3FS_MALLOCTRIM(0);
return true;
}
S3fsMultiSuccessCallback S3fsMultiCurl::SetSuccessCallback(S3fsMultiSuccessCallback function)
{
S3fsMultiSuccessCallback old = SuccessCallback;
SuccessCallback = function;
return old;
}
S3fsMultiNotFoundCallback S3fsMultiCurl::SetNotFoundCallback(S3fsMultiNotFoundCallback function)
{
S3fsMultiNotFoundCallback old = NotFoundCallback;
NotFoundCallback = function;
return old;
}
S3fsMultiRetryCallback S3fsMultiCurl::SetRetryCallback(S3fsMultiRetryCallback function)
{
S3fsMultiRetryCallback old = RetryCallback;
RetryCallback = function;
return old;
}
void* S3fsMultiCurl::SetSuccessCallbackParam(void* param)
{
void* old = pSuccessCallbackParam;
pSuccessCallbackParam = param;
return old;
}
void* S3fsMultiCurl::SetNotFoundCallbackParam(void* param)
{
void* old = pNotFoundCallbackParam;
pNotFoundCallbackParam = param;
return old;
}
bool S3fsMultiCurl::SetS3fsCurlObject(std::unique_ptr<S3fsCurl> s3fscurl)
{
if(!s3fscurl){
return false;
}
clist_all.push_back(std::move(s3fscurl));
return true;
}
int S3fsMultiCurl::MultiPerform()
{
std::map<std::thread::id, std::pair<std::thread, std::future<int>>> threads;
int result = 0;
bool isMultiHead = false;
int semCount = GetMaxParallelism();
Semaphore sem(semCount);
for(auto iter = clist_req.cbegin(); iter != clist_req.cend(); ++iter) {
S3fsCurl* s3fscurl = iter->get();
if(!s3fscurl){
continue;
}
sem.acquire();
{
const std::lock_guard<std::mutex> lock(completed_tids_lock);
for(const auto &thread_id : completed_tids){
auto it = threads.find(thread_id);
it->second.first.join();
long int int_retval = it->second.second.get();
if (int_retval && !(int_retval == -ENOENT && isMultiHead)) {
S3FS_PRN_WARN("thread terminated with non-zero return code: %ld", int_retval);
}
threads.erase(it);
}
completed_tids.clear();
}
s3fscurl->sem = &sem;
s3fscurl->completed_tids_lock = &completed_tids_lock;
s3fscurl->completed_tids = &completed_tids;
isMultiHead |= s3fscurl->GetOp() == "HEAD";
std::promise<int> promise;
std::future<int> future = promise.get_future();
std::thread thread(S3fsMultiCurl::RequestPerformWrapper, s3fscurl, std::move(promise));
auto thread_id = thread.get_id();
threads.emplace(std::piecewise_construct, std::forward_as_tuple(thread_id), std::forward_as_tuple(std::move(thread), std::move(future)));
}
for(int i = 0; i < semCount; ++i){
sem.acquire();
}
const std::lock_guard<std::mutex> lock(completed_tids_lock);
for(const auto &thread_id : completed_tids){
auto it = threads.find(thread_id);
it->second.first.join();
auto int_retval = it->second.second.get();
// [NOTE]
// For multipart HEAD requests(ex. readdir), we need to consider the
// cases where you may get ENOENT and EPERM.
// For example, we will get ENOENT when sending a HEAD request to a
// directory of an object whose path contains a directory uploaded
// from a client other than s3fs.
// We will also get EPERM if you try to read an encrypted object
// (such as KMS) without specifying SSE or with a different KMS key.
// In these cases, if we end this method with that error result,
// the caller will not be able to continue processing.(readdir will
// fail.)
// Therefore, if those conditions are met here, avoid setting it to
// result.
//
if(!isMultiHead || (-ENOENT != int_retval && -EPERM != int_retval)){
S3FS_PRN_WARN("thread terminated with non-zero return code: %d", int_retval);
result = int_retval;
}
threads.erase(it);
}
completed_tids.clear();
return result;
}
int S3fsMultiCurl::MultiRead()
{
int result = 0;
for(auto iter = clist_req.begin(); iter != clist_req.end(); ){
std::unique_ptr<S3fsCurl> s3fscurl(std::move(*iter));
bool isRetry = false;
bool isPostpone = false;
bool isNeedResetOffset = true;
long responseCode = S3fsCurl::S3FSCURL_RESPONSECODE_NOTSET;
CURLcode curlCode = s3fscurl->GetCurlCode();
if(s3fscurl->GetResponseCode(responseCode, false) && curlCode == CURLE_OK){
if(S3fsCurl::S3FSCURL_RESPONSECODE_NOTSET == responseCode){
// This is a case where the processing result has not yet been updated (should be very rare).
isPostpone = true;
}else if(400 > responseCode){
// add into stat cache
// cppcheck-suppress unmatchedSuppression
// cppcheck-suppress knownPointerToBool
if(SuccessCallback && !SuccessCallback(s3fscurl.get(), pSuccessCallbackParam)){
S3FS_PRN_WARN("error from success callback function(%s).", s3fscurl->url.c_str());
}
}else if(400 == responseCode){
// as possibly in multipart
S3FS_PRN_WARN("failed a request(%ld: %s)", responseCode, s3fscurl->url.c_str());
isRetry = true;
}else if(404 == responseCode){
// not found
// HEAD requests on readdir_multi_head can return 404
if(s3fscurl->GetOp() != "HEAD"){
S3FS_PRN_WARN("failed a request(%ld: %s)", responseCode, s3fscurl->url.c_str());
}
// Call callback function
// cppcheck-suppress unmatchedSuppression
// cppcheck-suppress knownPointerToBool
if(NotFoundCallback && !NotFoundCallback(s3fscurl.get(), pNotFoundCallbackParam)){
S3FS_PRN_WARN("error from not found callback function(%s).", s3fscurl->url.c_str());
}
}else if(500 == responseCode){
// case of all other result, do retry.(11/13/2013)
// because it was found that s3fs got 500 error from S3, but could success
// to retry it.
S3FS_PRN_WARN("failed a request(%ld: %s)", responseCode, s3fscurl->url.c_str());
isRetry = true;
}else{
// Retry in other case.
S3FS_PRN_WARN("failed a request(%ld: %s)", responseCode, s3fscurl->url.c_str());
isRetry = true;
}
}else{
S3FS_PRN_ERR("failed a request(Unknown response code: %s)", s3fscurl->url.c_str());
// Reuse particular file
switch(curlCode){
case CURLE_OPERATION_TIMEDOUT:
isRetry = true;
isNeedResetOffset = false;
break;
case CURLE_PARTIAL_FILE:
isRetry = true;
isNeedResetOffset = false;
break;
default:
S3FS_PRN_ERR("###curlCode: %d msg: %s", curlCode, curl_easy_strerror(curlCode));
isRetry = true;
break;
}
}
if(isPostpone){
clist_req.erase(iter);
clist_req.push_back(std::move(s3fscurl)); // Re-evaluate at the end
iter = clist_req.begin();
}else{
if(!isRetry || (!not_abort && 0 != result)){
// If an EIO error has already occurred, it will be terminated
// immediately even if retry processing is required.
s3fscurl->DestroyCurlHandle();
}else{
// Reset offset
if(isNeedResetOffset){
S3fsCurl::ResetOffset(s3fscurl.get());
}
// For retry
std::unique_ptr<S3fsCurl> retrycurl;
const S3fsCurl* retrycurl_ptr = retrycurl.get(); // save this due to std::move below
if(RetryCallback){
retrycurl = RetryCallback(s3fscurl.get());
if(nullptr != retrycurl){
clist_all.push_back(std::move(retrycurl));
}else{
// set EIO and wait for other parts.
result = -EIO;
}
}
// cppcheck-suppress mismatchingContainers
if(s3fscurl.get() != retrycurl_ptr){
s3fscurl->DestroyCurlHandle();
}
}
iter = clist_req.erase(iter);
}
}
clist_req.clear();
if(!not_abort && 0 != result){
// If an EIO error has already occurred, clear all retry objects.
for(auto iter = clist_all.cbegin(); iter != clist_all.cend(); ++iter){
S3fsCurl* s3fscurl = iter->get();
s3fscurl->DestroyCurlHandle();
}
clist_all.clear();
}
return result;
}
int S3fsMultiCurl::Request()
{
S3FS_PRN_INFO3("[count=%zu]", clist_all.size());
// Make request list.
//
// Send multi request loop( with retry )
// (When many request is sends, sometimes gets "Couldn't connect to server")
//
while(!clist_all.empty()){
// set curl handle to multi handle
int result;
for(auto iter = clist_all.begin(); iter != clist_all.end(); ++iter){
clist_req.push_back(std::move(*iter));
}
clist_all.clear();
// Send multi request.
if(0 != (result = MultiPerform())){
Clear();
return result;
}
// Read the result
if(0 != (result = MultiRead())){
Clear();
return result;
}
// Cleanup curl handle in multi handle
ClearEx(false);
}
return 0;
}
//
// thread function for performing an S3fsCurl request
//
void S3fsMultiCurl::RequestPerformWrapper(S3fsCurl* s3fscurl, std::promise<int> promise)
{
int result = 0;
if(!s3fscurl){
// this doesn't signal completion but also never happens
promise.set_value(-EIO);
return;
}
if(s3fscurl->fpLazySetup){
if(!s3fscurl->fpLazySetup(s3fscurl)){
S3FS_PRN_ERR("Failed to lazy setup, then respond EIO.");
result = -EIO;
}
}
if(!result){
result = s3fscurl->RequestPerform();
s3fscurl->DestroyCurlHandle(false);
}
const std::lock_guard<std::mutex> lock(*s3fscurl->completed_tids_lock);
s3fscurl->completed_tids->push_back(std::this_thread::get_id());
s3fscurl->sem->release();
promise.set_value(result);
}
/*
* Local variables:
* tab-width: 4
* c-basic-offset: 4
* End:
* vim600: expandtab sw=4 ts=4 fdm=marker
* vim<600: expandtab sw=4 ts=4
*/

View File

@ -1,99 +0,0 @@
/*
* s3fs - FUSE-based file system backed by Amazon S3
*
* Copyright(C) 2007 Randy Rizun <rrizun@gmail.com>
*
* This program is free software; you can redistribute it and/or
* modify it under the terms of the GNU General Public License
* as published by the Free Software Foundation; either version 2
* of the License, or (at your option) any later version.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU General Public License for more details.
*
* You should have received a copy of the GNU General Public License
* along with this program; if not, write to the Free Software
* Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA.
*/
#ifndef S3FS_CURL_MULTI_H_
#define S3FS_CURL_MULTI_H_
#include <future>
#include <memory>
#include <mutex>
#include <thread>
#include <vector>
#include "common.h"
//----------------------------------------------
// Typedef
//----------------------------------------------
class S3fsCurl;
typedef std::vector<std::unique_ptr<S3fsCurl>> s3fscurllist_t;
typedef bool (*S3fsMultiSuccessCallback)(S3fsCurl* s3fscurl, void* param); // callback for succeed multi request
typedef bool (*S3fsMultiNotFoundCallback)(S3fsCurl* s3fscurl, void* param); // callback for succeed multi request
typedef std::unique_ptr<S3fsCurl> (*S3fsMultiRetryCallback)(S3fsCurl* s3fscurl); // callback for failure and retrying
//----------------------------------------------
// class S3fsMultiCurl
//----------------------------------------------
class S3fsMultiCurl
{
private:
const int maxParallelism;
s3fscurllist_t clist_all; // all of curl requests
s3fscurllist_t clist_req; // curl requests are sent
bool not_abort; // complete all requests without aborting on errors
S3fsMultiSuccessCallback SuccessCallback;
S3fsMultiNotFoundCallback NotFoundCallback;
S3fsMultiRetryCallback RetryCallback;
void* pSuccessCallbackParam;
void* pNotFoundCallbackParam;
std::mutex completed_tids_lock;
std::vector<std::thread::id> completed_tids GUARDED_BY(completed_tids_lock);
private:
bool ClearEx(bool is_all);
int MultiPerform();
int MultiRead();
static void RequestPerformWrapper(S3fsCurl* s3fscurl, std::promise<int> promise);
public:
explicit S3fsMultiCurl(int maxParallelism, bool not_abort = false);
~S3fsMultiCurl();
S3fsMultiCurl(const S3fsMultiCurl&) = delete;
S3fsMultiCurl(S3fsMultiCurl&&) = delete;
S3fsMultiCurl& operator=(const S3fsMultiCurl&) = delete;
S3fsMultiCurl& operator=(S3fsMultiCurl&&) = delete;
int GetMaxParallelism() const { return maxParallelism; }
S3fsMultiSuccessCallback SetSuccessCallback(S3fsMultiSuccessCallback function);
S3fsMultiNotFoundCallback SetNotFoundCallback(S3fsMultiNotFoundCallback function);
S3fsMultiRetryCallback SetRetryCallback(S3fsMultiRetryCallback function);
void* SetSuccessCallbackParam(void* param);
void* SetNotFoundCallbackParam(void* param);
bool Clear() { return ClearEx(true); }
bool SetS3fsCurlObject(std::unique_ptr<S3fsCurl> s3fscurl);
int Request();
};
#endif // S3FS_CURL_MULTI_H_
/*
* Local variables:
* tab-width: 4
* c-basic-offset: 4
* End:
* vim600: expandtab sw=4 ts=4 fdm=marker
* vim<600: expandtab sw=4 ts=4
*/

View File

@ -1551,8 +1551,7 @@ int FdEntity::RowFlushMultipart(PseudoFdInfo* pseudo_obj, const char* tpath)
}else if(pagelist.Size() >= S3fsCurl::GetMultipartSize()){
// multipart uploading
result = S3fsCurl::ParallelMultipartUploadRequest(tpath ? tpath : tmppath.c_str(), tmporgmeta, physical_fd);
result = multipart_upload_request((tpath ? std::string(tpath) : tmppath), tmporgmeta, physical_fd);
}else{
// normal uploading (too small part size)
@ -1694,7 +1693,7 @@ int FdEntity::RowFlushMixMultipart(PseudoFdInfo* pseudo_obj, const char* tpath)
}
// multipart uploading with copy api
result = S3fsCurl::ParallelMixMultipartUploadRequest(tpath ? tpath : tmppath.c_str(), tmporgmeta, physical_fd, mixuppages);
result = mix_multipart_upload_request((tpath ? std::string(tpath) : tmppath), tmporgmeta, physical_fd, mixuppages);
}else{
// normal uploading (too small part size)

View File

@ -43,7 +43,6 @@
#include "fdcache_auto.h"
#include "fdcache_stat.h"
#include "curl.h"
#include "curl_multi.h"
#include "curl_util.h"
#include "s3objlist.h"
#include "cache.h"

View File

@ -945,6 +945,221 @@ int await_multipart_upload_part_request(const std::string& path, int upload_fd,
return 0;
}
//
// Complete sequence of Multipart Upload Requests processing
//
// Call the following function:
// pre_multipart_upload_request()
// multipart_upload_part_request()
// abort_multipart_upload_request()
// complete_multipart_upload_request()
//
int multipart_upload_request(const std::string& path, const headers_t& meta, int upload_fd)
{
S3FS_PRN_INFO3("Multipart Upload Request [path=%s][upload_fd=%d]", path.c_str(), upload_fd);
// Get file stat
struct stat st;
if(-1 == fstat(upload_fd, &st)){
S3FS_PRN_ERR("Invalid file descriptor(errno=%d)", errno);
return -errno;
}
// Get upload id
std::string upload_id;
int result;
if(0 != (result = pre_multipart_upload_request(path, meta, upload_id))){
return result;
}
Semaphore upload_sem(0);
std::mutex result_lock; // protects last_result
int last_result = 0;
int req_count = 0; // request count(the part number will be this value +1.)
etaglist_t list;
// cycle through open upload_fd, pulling off 10MB chunks at a time
for(off_t remaining_bytes = st.st_size; 0 < remaining_bytes; ++req_count){
// add new etagpair to etaglist_t list
list.emplace_back(nullptr, (req_count + 1));
etagpair* petag = &list.back();
off_t start = st.st_size - remaining_bytes;
off_t chunk = std::min(remaining_bytes, S3fsCurl::GetMultipartSize());
S3FS_PRN_INFO3("Multipart Upload Part [path=%s][start=%lld][size=%lld][part_num=%d]", path.c_str(), static_cast<long long int>(start), static_cast<long long int>(chunk), (req_count + 1));
// setup instruction and request on another thread
if(0 != (result = multipart_upload_part_request(path, upload_fd, start, chunk, (req_count + 1), upload_id, petag, false, &upload_sem, &result_lock, &last_result))){
S3FS_PRN_ERR("failed setup instruction for Multipart Upload Part Request by error(%d) [path=%s][start=%lld][size=%lld][part_num=%d]", result, path.c_str(), static_cast<long long int>(start), static_cast<long long int>(chunk), (req_count + 1));
// [NOTE]
// Hold onto result until all request finish.
break;
}
remaining_bytes -= chunk;
}
// wait for finish all requests
while(req_count > 0){
upload_sem.acquire();
--req_count;
}
// check result
if(0 != result || 0 != last_result){
S3FS_PRN_ERR("Error occurred in Multipart Upload Request (errno=%d).", (0 != result ? result : last_result));
int result2;
if(0 != (result2 = abort_multipart_upload_request(path, upload_id))){
S3FS_PRN_ERR("Error aborting Multipart Upload Request (errno=%d).", result2);
}
return (0 != result ? result : last_result);
}
// complete requests
if(0 != (result = complete_multipart_upload_request(path, upload_id, list))){
S3FS_PRN_ERR("Error occurred in Completion for Multipart Upload Request (errno=%d).", result);
return result;
}
return 0;
}
//
// Complete sequence of Mix Multipart Upload Requests processing
//
// Call the following function:
// pre_multipart_upload_request()
// multipart_upload_part_request()
// abort_multipart_upload_request()
// complete_multipart_upload_request()
//
int mix_multipart_upload_request(const std::string& path, headers_t& meta, int upload_fd, const fdpage_list_t& mixuppages)
{
S3FS_PRN_INFO3("Mix Multipart Upload Request [path=%s][upload_fd=%d]", path.c_str(), upload_fd);
// Get file stat
struct stat st;
if(-1 == fstat(upload_fd, &st)){
S3FS_PRN_ERR("Invalid file descriptor(errno=%d)", errno);
return -errno;
}
// Get upload id
std::string upload_id;
int result;
if(0 != (result = pre_multipart_upload_request(path, meta, upload_id))){
return result;
}
// Prepare headers for Multipart Upload Copy
std::string srcresource;
std::string srcurl;
MakeUrlResource(get_realpath(path.c_str()).c_str(), srcresource, srcurl);
meta["Content-Type"] = S3fsCurl::LookupMimeType(path);
meta["x-amz-copy-source"] = srcresource;
Semaphore upload_sem(0);
std::mutex result_lock; // protects last_result
int last_result = 0;
int req_count = 0; // request count(the part number will be this value +1.)
etaglist_t list;
for(auto iter = mixuppages.cbegin(); iter != mixuppages.cend(); ++iter){
if(iter->modified){
//
// Multipart Upload Content
//
// add new etagpair to etaglist_t list
list.emplace_back(nullptr, (req_count + 1));
etagpair* petag = &list.back();
S3FS_PRN_INFO3("Mix Multipart Upload Content Part [path=%s][start=%lld][size=%lld][part_num=%d]", path.c_str(), static_cast<long long int>(iter->offset), static_cast<long long int>(iter->bytes), (req_count + 1));
// setup instruction and request on another thread
if(0 != (result = multipart_upload_part_request(path, upload_fd, iter->offset, iter->bytes, (req_count + 1), upload_id, petag, false, &upload_sem, &result_lock, &last_result))){
S3FS_PRN_ERR("Failed setup instruction for Mix Multipart Upload Content Part Request by error(%d) [path=%s][start=%lld][size=%lld][part_num=%d]", result, path.c_str(), static_cast<long long int>(iter->offset), static_cast<long long int>(iter->bytes), (req_count + 1));
// [NOTE]
// Hold onto result until all request finish.
break;
}
++req_count;
}else{
//
// Multipart Upload Copy
//
// [NOTE]
// Each part must be larger than MIN_MULTIPART_SIZE and smaller than FIVE_GB, then loop.
// This loop breaks if result is not 0.
//
for(off_t processed_bytes = 0, request_bytes = 0; processed_bytes < iter->bytes && 0 == result; processed_bytes += request_bytes){
// Set temporary part sizes
request_bytes = std::min(S3fsCurl::GetMultipartCopySize(), (iter->bytes - processed_bytes));
// Check lastest part size
off_t remain_bytes = iter->bytes - processed_bytes - request_bytes;
if((0 < remain_bytes) && (remain_bytes < MIN_MULTIPART_SIZE)){
if(FIVE_GB < (request_bytes + remain_bytes)){
request_bytes = (request_bytes + remain_bytes) / 2;
} else{
request_bytes += remain_bytes;
}
}
// Set headers for Multipart Upload Copy
std::ostringstream strrange;
strrange << "bytes=" << (iter->offset + processed_bytes) << "-" << (iter->offset + processed_bytes + request_bytes - 1);
meta["x-amz-copy-source-range"] = strrange.str();
// add new etagpair to etaglist_t list
list.emplace_back(nullptr, (req_count + 1));
etagpair* petag = &list.back();
S3FS_PRN_INFO3("Mix Multipart Upload Copy Part [path=%s][start=%lld][size=%lld][part_num=%d]", path.c_str(), static_cast<long long int>(iter->offset + processed_bytes), static_cast<long long int>(request_bytes), (req_count + 1));
// setup instruction and request on another thread
if(0 != (result = multipart_upload_part_request(path, upload_fd, (iter->offset + processed_bytes), request_bytes, (req_count + 1), upload_id, petag, true, &upload_sem, &result_lock, &last_result))){
S3FS_PRN_ERR("Failed setup instruction for Mix Multipart Upload Copy Part Request by error(%d) [path=%s][start=%lld][size=%lld][part_num=%d]", result, path.c_str(), static_cast<long long int>(iter->offset + processed_bytes), static_cast<long long int>(request_bytes), (req_count + 1));
// [NOTE]
// This loop breaks because result is not 0.
}
++req_count;
}
if(0 != result){
// [NOTE]
// Hold onto result until all request finish.
break;
}
}
}
// wait for finish all requests
while(req_count > 0){
upload_sem.acquire();
--req_count;
}
// check result
if(0 != result || 0 != last_result){
S3FS_PRN_ERR("Error occurred in Mix Multipart Upload Request (errno=%d).", (0 != result ? result : last_result));
int result2;
if(0 != (result2 = abort_multipart_upload_request(path, upload_id))){
S3FS_PRN_ERR("Error aborting Mix Multipart Upload Request (errno=%d).", result2);
}
return (0 != result ? result : last_result);
}
// complete requests
if(0 != (result = complete_multipart_upload_request(path, upload_id, list))){
S3FS_PRN_ERR("Error occurred in Completion for Mix Multipart Upload Request (errno=%d).", result);
return result;
}
return 0;
}
//
// Calls S3fsCurl::MultipartUploadComplete via complete_multipart_upload_threadworker
//

View File

@ -237,6 +237,8 @@ int check_service_request(const std::string& strpath, bool forceNoSSE, bool supp
int pre_multipart_upload_request(const std::string& path, const headers_t& meta, std::string& upload_id);
int multipart_upload_part_request(const std::string& path, int upload_fd, off_t start, off_t size, int part_num, const std::string& upload_id, etagpair* petag, bool is_copy, Semaphore* psem, std::mutex* pthparam_lock, int* req_result);
int await_multipart_upload_part_request(const std::string& path, int upload_fd, off_t start, off_t size, int part_num, const std::string& upload_id, etagpair* petag, bool is_copy);
int multipart_upload_request(const std::string& path, const headers_t& meta, int upload_fd);
int mix_multipart_upload_request(const std::string& path, headers_t& meta, int upload_fd, const fdpage_list_t& mixuppages);
int complete_multipart_upload_request(const std::string& path, const std::string& upload_id, const etaglist_t& parts);
int abort_multipart_upload_request(const std::string& path, const std::string& upload_id);
int multipart_put_head_request(const std::string& strfrom, const std::string& strto, off_t size, const headers_t& meta);