Refactored single type requests to use through ThreadPoolMan

This commit is contained in:
Takeshi Nakatani 2024-07-15 06:40:05 +00:00 committed by Andrew Gaul
parent a680d3e138
commit efc23316e9
16 changed files with 1463 additions and 456 deletions

View File

@ -43,6 +43,7 @@ s3fs_SOURCES = \
string_util.cpp \
s3fs_cred.cpp \
s3fs_util.cpp \
s3fs_threadreqs.cpp \
fdcache.cpp \
fdcache_entity.cpp \
fdcache_page.cpp \

View File

@ -42,6 +42,7 @@
#include "s3fs_util.h"
#include "string_util.h"
#include "addhead.h"
#include "s3fs_threadreqs.h"
//-------------------------------------------------------------------
// Symbols
@ -1216,28 +1217,28 @@ bool S3fsCurl::SetIPResolveType(const char* value)
// cppcheck-suppress unmatchedSuppression
// cppcheck-suppress constParameter
// cppcheck-suppress constParameterCallback
bool S3fsCurl::UploadMultipartPostCallback(S3fsCurl* s3fscurl, void* param)
bool S3fsCurl::MultipartUploadPartCallback(S3fsCurl* s3fscurl, void* param)
{
if(!s3fscurl || param){ // this callback does not need a parameter
return false;
}
return s3fscurl->UploadMultipartPostComplete();
return s3fscurl->MultipartUploadPartComplete();
}
// cppcheck-suppress unmatchedSuppression
// cppcheck-suppress constParameter
// cppcheck-suppress constParameterCallback
bool S3fsCurl::MixMultipartPostCallback(S3fsCurl* s3fscurl, void* param)
bool S3fsCurl::MixMultipartUploadCallback(S3fsCurl* s3fscurl, void* param)
{
if(!s3fscurl || param){ // this callback does not need a parameter
return false;
}
return s3fscurl->MixMultipartPostComplete();
return s3fscurl->MixMultipartUploadComplete();
}
std::unique_ptr<S3fsCurl> S3fsCurl::UploadMultipartPostRetryCallback(S3fsCurl* s3fscurl)
std::unique_ptr<S3fsCurl> S3fsCurl::MultipartUploadPartRetryCallback(S3fsCurl* s3fscurl)
{
if(!s3fscurl){
return nullptr;
@ -1277,14 +1278,14 @@ std::unique_ptr<S3fsCurl> S3fsCurl::UploadMultipartPostRetryCallback(S3fsCurl* s
newcurl->type = s3fscurl->type;
// setup new curl object
if(0 != newcurl->UploadMultipartPostSetup(s3fscurl->path.c_str(), part_num, upload_id)){
if(0 != newcurl->MultipartUploadPartSetup(s3fscurl->path.c_str(), part_num, upload_id)){
S3FS_PRN_ERR("Could not duplicate curl object(%s:%d).", s3fscurl->path.c_str(), part_num);
return nullptr;
}
return newcurl;
}
std::unique_ptr<S3fsCurl> S3fsCurl::CopyMultipartPostRetryCallback(S3fsCurl* s3fscurl)
std::unique_ptr<S3fsCurl> S3fsCurl::CopyMultipartUploadRetryCallback(S3fsCurl* s3fscurl)
{
if(!s3fscurl){
return nullptr;
@ -1321,23 +1322,23 @@ std::unique_ptr<S3fsCurl> S3fsCurl::CopyMultipartPostRetryCallback(S3fsCurl* s3f
newcurl->type = s3fscurl->type;
// setup new curl object
if(0 != newcurl->CopyMultipartPostSetup(s3fscurl->b_from.c_str(), s3fscurl->path.c_str(), part_num, upload_id, s3fscurl->b_meta)){
if(0 != newcurl->CopyMultipartUploadSetup(s3fscurl->b_from.c_str(), s3fscurl->path.c_str(), part_num, upload_id, s3fscurl->b_meta)){
S3FS_PRN_ERR("Could not duplicate curl object(%s:%d).", s3fscurl->path.c_str(), part_num);
return nullptr;
}
return newcurl;
}
std::unique_ptr<S3fsCurl> S3fsCurl::MixMultipartPostRetryCallback(S3fsCurl* s3fscurl)
std::unique_ptr<S3fsCurl> S3fsCurl::MixMultipartUploadRetryCallback(S3fsCurl* s3fscurl)
{
if(!s3fscurl){
return nullptr;
}
if(-1 == s3fscurl->partdata.fd){
return S3fsCurl::CopyMultipartPostRetryCallback(s3fscurl);
return S3fsCurl::CopyMultipartUploadRetryCallback(s3fscurl);
}else{
return S3fsCurl::UploadMultipartPostRetryCallback(s3fscurl);
return S3fsCurl::MultipartUploadPartRetryCallback(s3fscurl);
}
}
@ -1395,7 +1396,7 @@ std::unique_ptr<S3fsCurl> S3fsCurl::CreateParallelS3fsCurl(const char* tpath, in
S3FS_PRN_INFO3("Upload Part [tpath=%s][start=%lld][size=%lld][part=%d]", SAFESTRPTR(tpath), static_cast<long long>(start), static_cast<long long>(size), part_num);
if(0 != (result = s3fscurl->UploadMultipartPostSetup(tpath, part_num, upload_id))){
if(0 != (result = s3fscurl->MultipartUploadPartSetup(tpath, part_num, upload_id))){
S3FS_PRN_ERR("failed uploading part setup(%d)", result);
return nullptr;
}
@ -1416,7 +1417,7 @@ std::unique_ptr<S3fsCurl> S3fsCurl::CreateParallelS3fsCurl(const char* tpath, in
S3FS_PRN_INFO3("Copy Part [tpath=%s][start=%lld][size=%lld][part=%d]", SAFESTRPTR(tpath), static_cast<long long>(start), static_cast<long long>(size), part_num);
if(0 != (result = s3fscurl->CopyMultipartPostSetup(tpath, tpath, part_num, upload_id, meta))){
if(0 != (result = s3fscurl->CopyMultipartUploadSetup(tpath, tpath, part_num, upload_id, meta))){
S3FS_PRN_ERR("failed uploading part setup(%d)", result);
return nullptr;
}
@ -1431,14 +1432,13 @@ std::unique_ptr<S3fsCurl> S3fsCurl::CreateParallelS3fsCurl(const char* tpath, in
return s3fscurl;
}
int S3fsCurl::ParallelMultipartUploadRequest(const char* tpath, headers_t& meta, int fd)
int S3fsCurl::ParallelMultipartUploadRequest(const char* tpath, const headers_t& meta, int fd)
{
int result;
std::string upload_id;
struct stat st;
etaglist_t list;
off_t remaining_bytes;
S3fsCurl s3fscurl(true);
S3FS_PRN_INFO3("[tpath=%s][fd=%d]", SAFESTRPTR(tpath), fd);
@ -1447,15 +1447,14 @@ int S3fsCurl::ParallelMultipartUploadRequest(const char* tpath, headers_t& meta,
return -errno;
}
if(0 != (result = s3fscurl.PreMultipartPostRequest(tpath, meta, upload_id))){
if(0 != (result = pre_multipart_upload_request(std::string(tpath), meta, upload_id))){
return result;
}
s3fscurl.DestroyCurlHandle();
// Initialize S3fsMultiCurl
S3fsMultiCurl curlmulti(GetMaxParallelCount());
curlmulti.SetSuccessCallback(S3fsCurl::UploadMultipartPostCallback);
curlmulti.SetRetryCallback(S3fsCurl::UploadMultipartPostRetryCallback);
curlmulti.SetSuccessCallback(S3fsCurl::MultipartUploadPartCallback);
curlmulti.SetRetryCallback(S3fsCurl::MultipartUploadPartRetryCallback);
// cycle through open fd, pulling off 10MB chunks at a time
for(remaining_bytes = st.st_size; 0 < remaining_bytes; ){
@ -1471,7 +1470,7 @@ int S3fsCurl::ParallelMultipartUploadRequest(const char* tpath, headers_t& meta,
s3fscurl_para->partdata.add_etag_list(list);
// initiate upload part for parallel
if(0 != (result = s3fscurl_para->UploadMultipartPostSetup(tpath, s3fscurl_para->partdata.get_part_number(), upload_id))){
if(0 != (result = s3fscurl_para->MultipartUploadPartSetup(tpath, s3fscurl_para->partdata.get_part_number(), upload_id))){
S3FS_PRN_ERR("failed uploading part setup(%d)", result);
return result;
}
@ -1487,18 +1486,14 @@ int S3fsCurl::ParallelMultipartUploadRequest(const char* tpath, headers_t& meta,
// Multi request
if(0 != (result = curlmulti.Request())){
S3FS_PRN_ERR("error occurred in multi request(errno=%d).", result);
S3fsCurl s3fscurl_abort(true);
int result2 = s3fscurl_abort.AbortMultipartUpload(tpath, upload_id);
s3fscurl_abort.DestroyCurlHandle();
if(result2 != 0){
int result2;
if(0 != (result2 = abort_multipart_upload_request(std::string(tpath), upload_id))){
S3FS_PRN_ERR("error aborting multipart upload(errno=%d).", result2);
}
return result;
}
if(0 != (result = s3fscurl.CompleteMultipartPostRequest(tpath, upload_id, list))){
if(0 != (result = complete_multipart_upload_request(std::string(tpath), upload_id, list))){
return result;
}
return 0;
@ -1510,7 +1505,6 @@ int S3fsCurl::ParallelMixMultipartUploadRequest(const char* tpath, headers_t& me
std::string upload_id;
struct stat st;
etaglist_t list;
S3fsCurl s3fscurl(true);
S3FS_PRN_INFO3("[tpath=%s][fd=%d]", SAFESTRPTR(tpath), fd);
@ -1519,10 +1513,9 @@ int S3fsCurl::ParallelMixMultipartUploadRequest(const char* tpath, headers_t& me
return -errno;
}
if(0 != (result = s3fscurl.PreMultipartPostRequest(tpath, meta, upload_id))){
if(0 != (result = pre_multipart_upload_request(std::string(tpath), meta, upload_id))){
return result;
}
s3fscurl.DestroyCurlHandle();
// for copy multipart
std::string srcresource;
@ -1533,8 +1526,8 @@ int S3fsCurl::ParallelMixMultipartUploadRequest(const char* tpath, headers_t& me
// Initialize S3fsMultiCurl
S3fsMultiCurl curlmulti(GetMaxParallelCount());
curlmulti.SetSuccessCallback(S3fsCurl::MixMultipartPostCallback);
curlmulti.SetRetryCallback(S3fsCurl::MixMultipartPostRetryCallback);
curlmulti.SetSuccessCallback(S3fsCurl::MixMultipartUploadCallback);
curlmulti.SetRetryCallback(S3fsCurl::MixMultipartUploadRetryCallback);
for(auto iter = mixuppages.cbegin(); iter != mixuppages.cend(); ++iter){
if(iter->modified){
@ -1550,7 +1543,7 @@ int S3fsCurl::ParallelMixMultipartUploadRequest(const char* tpath, headers_t& me
S3FS_PRN_INFO3("Upload Part [tpath=%s][start=%lld][size=%lld][part=%d]", SAFESTRPTR(tpath), static_cast<long long>(iter->offset), static_cast<long long>(iter->bytes), s3fscurl_para->partdata.get_part_number());
// initiate upload part for parallel
if(0 != (result = s3fscurl_para->UploadMultipartPostSetup(tpath, s3fscurl_para->partdata.get_part_number(), upload_id))){
if(0 != (result = s3fscurl_para->MultipartUploadPartSetup(tpath, s3fscurl_para->partdata.get_part_number(), upload_id))){
S3FS_PRN_ERR("failed uploading part setup(%d)", result);
return result;
}
@ -1588,7 +1581,7 @@ int S3fsCurl::ParallelMixMultipartUploadRequest(const char* tpath, headers_t& me
S3FS_PRN_INFO3("Copy Part [tpath=%s][start=%lld][size=%lld][part=%d]", SAFESTRPTR(tpath), static_cast<long long>(iter->offset + i), static_cast<long long>(bytes), s3fscurl_para->partdata.get_part_number());
// initiate upload part for parallel
if(0 != (result = s3fscurl_para->CopyMultipartPostSetup(tpath, tpath, s3fscurl_para->partdata.get_part_number(), upload_id, meta))){
if(0 != (result = s3fscurl_para->CopyMultipartUploadSetup(tpath, tpath, s3fscurl_para->partdata.get_part_number(), upload_id, meta))){
S3FS_PRN_ERR("failed uploading part setup(%d)", result);
return result;
}
@ -1605,17 +1598,14 @@ int S3fsCurl::ParallelMixMultipartUploadRequest(const char* tpath, headers_t& me
// Multi request
if(0 != (result = curlmulti.Request())){
S3FS_PRN_ERR("error occurred in multi request(errno=%d).", result);
S3fsCurl s3fscurl_abort(true);
int result2 = s3fscurl_abort.AbortMultipartUpload(tpath, upload_id);
s3fscurl_abort.DestroyCurlHandle();
if(result2 != 0){
int result2;
if(0 != (result2 = abort_multipart_upload_request(std::string(tpath), upload_id))){
S3FS_PRN_ERR("error aborting multipart upload(errno=%d).", result2);
}
return result;
}
if(0 != (result = s3fscurl.CompleteMultipartPostRequest(tpath, upload_id, list))){
if(0 != (result = complete_multipart_upload_request(std::string(tpath), upload_id, list))){
return result;
}
return 0;
@ -1698,7 +1688,7 @@ int S3fsCurl::ParallelGetObjectRequest(const char* tpath, int fd, off_t start, o
return result;
}
bool S3fsCurl::UploadMultipartPostSetCurlOpts(S3fsCurl* s3fscurl)
bool S3fsCurl::MultipartUploadPartSetCurlOpts(S3fsCurl* s3fscurl)
{
if(!s3fscurl){
return false;
@ -1740,7 +1730,7 @@ bool S3fsCurl::UploadMultipartPostSetCurlOpts(S3fsCurl* s3fscurl)
return true;
}
bool S3fsCurl::CopyMultipartPostSetCurlOpts(S3fsCurl* s3fscurl)
bool S3fsCurl::CopyMultipartUploadSetCurlOpts(S3fsCurl* s3fscurl)
{
if(!s3fscurl){
return false;
@ -3406,7 +3396,7 @@ int S3fsCurl::HeadRequest(const char* tpath, headers_t& meta)
return 0;
}
int S3fsCurl::PutHeadRequest(const char* tpath, headers_t& meta, bool is_copy)
int S3fsCurl::PutHeadRequest(const char* tpath, const headers_t& meta, bool is_copy)
{
S3FS_PRN_INFO3("[tpath=%s]", SAFESTRPTR(tpath));
@ -3694,22 +3684,17 @@ int S3fsCurl::PreGetObjectRequest(const char* tpath, int fd, off_t start, off_t
return 0;
}
int S3fsCurl::GetObjectRequest(const char* tpath, int fd, off_t start, off_t size)
int S3fsCurl::GetObjectRequest(const char* tpath, int fd, off_t start, off_t size, sse_type_t ssetype, const std::string& ssevalue)
{
int result;
S3FS_PRN_INFO3("[tpath=%s][start=%lld][size=%lld]", SAFESTRPTR(tpath), static_cast<long long>(start), static_cast<long long>(size));
S3FS_PRN_INFO3("[tpath=%s][start=%lld][size=%lld][ssetype=%u][ssevalue=%s]", SAFESTRPTR(tpath), static_cast<long long>(start), static_cast<long long>(size), static_cast<uint8_t>(ssetype), ssevalue.c_str());
if(!tpath){
return -EINVAL;
}
sse_type_t local_ssetype = sse_type_t::SSE_DISABLE;
std::string ssevalue;
if(!get_object_sse_type(tpath, local_ssetype, ssevalue)){
S3FS_PRN_WARN("Failed to get SSE type for file(%s).", SAFESTRPTR(tpath));
}
if(0 != (result = PreGetObjectRequest(tpath, fd, start, size, local_ssetype, ssevalue))){
if(0 != (result = PreGetObjectRequest(tpath, fd, start, size, ssetype, ssevalue))){
return result;
}
if(!fpLazySetup || !fpLazySetup(this)){
@ -3868,7 +3853,7 @@ int S3fsCurl::ListBucketRequest(const char* tpath, const char* query)
// Date: Mon, 1 Nov 2010 20:34:56 GMT
// Authorization: AWS VGhpcyBtZXNzYWdlIHNpZ25lZCBieSBlbHZpbmc=
//
int S3fsCurl::PreMultipartPostRequest(const char* tpath, headers_t& meta, std::string& upload_id)
int S3fsCurl::PreMultipartUploadRequest(const char* tpath, const headers_t& meta, std::string& upload_id)
{
S3FS_PRN_INFO3("[tpath=%s]", SAFESTRPTR(tpath));
@ -3971,7 +3956,7 @@ int S3fsCurl::PreMultipartPostRequest(const char* tpath, headers_t& meta, std::s
return 0;
}
int S3fsCurl::CompleteMultipartPostRequest(const char* tpath, const std::string& upload_id, const etaglist_t& parts)
int S3fsCurl::MultipartUploadComplete(const char* tpath, const std::string& upload_id, const etaglist_t& parts)
{
S3FS_PRN_INFO3("[tpath=%s][parts=%zu]", SAFESTRPTR(tpath), parts.size());
@ -4179,7 +4164,7 @@ int S3fsCurl::AbortMultipartUpload(const char* tpath, const std::string& upload_
// Content-MD5: pUNXr/BjKK5G2UKvaRRrOA==
// Authorization: AWS VGhpcyBtZXNzYWdlIHNpZ25lZGGieSRlbHZpbmc=
//
int S3fsCurl::UploadMultipartPostSetup(const char* tpath, int part_num, const std::string& upload_id)
int S3fsCurl::MultipartUploadPartSetup(const char* tpath, int part_num, const std::string& upload_id)
{
S3FS_PRN_INFO3("[tpath=%s][start=%lld][size=%lld][part=%d]", SAFESTRPTR(tpath), static_cast<long long int>(partdata.startpos), static_cast<long long int>(partdata.size), part_num);
@ -4234,30 +4219,30 @@ int S3fsCurl::UploadMultipartPostSetup(const char* tpath, int part_num, const st
type = REQTYPE::UPLOADMULTIPOST;
// set lazy function
fpLazySetup = UploadMultipartPostSetCurlOpts;
fpLazySetup = MultipartUploadPartSetCurlOpts;
return 0;
}
int S3fsCurl::UploadMultipartPostRequest(const char* tpath, int part_num, const std::string& upload_id)
int S3fsCurl::MultipartUploadPartRequest(const char* tpath, int part_num, const std::string& upload_id)
{
int result;
S3FS_PRN_INFO3("[tpath=%s][start=%lld][size=%lld][part=%d]", SAFESTRPTR(tpath), static_cast<long long int>(partdata.startpos), static_cast<long long int>(partdata.size), part_num);
// setup
if(0 != (result = S3fsCurl::UploadMultipartPostSetup(tpath, part_num, upload_id))){
if(0 != (result = S3fsCurl::MultipartUploadPartSetup(tpath, part_num, upload_id))){
return result;
}
if(!fpLazySetup || !fpLazySetup(this)){
S3FS_PRN_ERR("Failed to lazy setup in multipart upload post request.");
S3FS_PRN_ERR("Failed to lazy setup in multipart upload part request.");
return -EIO;
}
// request
if(0 == (result = RequestPerform())){
if(!UploadMultipartPostComplete()){
if(!MultipartUploadPartComplete()){
result = -EIO;
}
}
@ -4269,7 +4254,7 @@ int S3fsCurl::UploadMultipartPostRequest(const char* tpath, int part_num, const
return result;
}
int S3fsCurl::CopyMultipartPostSetup(const char* from, const char* to, int part_num, const std::string& upload_id, headers_t& meta)
int S3fsCurl::CopyMultipartUploadSetup(const char* from, const char* to, int part_num, const std::string& upload_id, headers_t& meta)
{
S3FS_PRN_INFO3("[from=%s][to=%s][part=%d]", SAFESTRPTR(from), SAFESTRPTR(to), part_num);
@ -4327,7 +4312,7 @@ int S3fsCurl::CopyMultipartPostSetup(const char* from, const char* to, int part_
type = REQTYPE::COPYMULTIPOST;
// set lazy function
fpLazySetup = CopyMultipartPostSetCurlOpts;
fpLazySetup = CopyMultipartUploadSetCurlOpts;
// request
S3FS_PRN_INFO3("copying... [from=%s][to=%s][part=%d]", from, to, part_num);
@ -4335,7 +4320,7 @@ int S3fsCurl::CopyMultipartPostSetup(const char* from, const char* to, int part_
return 0;
}
bool S3fsCurl::UploadMultipartPostComplete()
bool S3fsCurl::MultipartUploadPartComplete()
{
auto it = responseHeaders.find("ETag");
if (it == responseHeaders.cend()) {
@ -4364,7 +4349,7 @@ bool S3fsCurl::UploadMultipartPostComplete()
// cppcheck-suppress unmatchedSuppression
// cppcheck-suppress constParameter
// cppcheck-suppress constParameterCallback
bool S3fsCurl::CopyMultipartPostCallback(S3fsCurl* s3fscurl, void* param)
bool S3fsCurl::CopyMultipartUploadCallback(S3fsCurl* s3fscurl, void* param)
{
if(!s3fscurl || param){ // this callback does not need a parameter
return false;
@ -4372,10 +4357,10 @@ bool S3fsCurl::CopyMultipartPostCallback(S3fsCurl* s3fscurl, void* param)
// cppcheck-suppress unmatchedSuppression
// cppcheck-suppress knownConditionTrueFalse
return s3fscurl->CopyMultipartPostComplete();
return s3fscurl->CopyMultipartUploadComplete();
}
bool S3fsCurl::CopyMultipartPostComplete()
bool S3fsCurl::CopyMultipartUploadComplete()
{
std::string etag;
partdata.uploaded = simple_parse_xml(bodydata.c_str(), bodydata.size(), "ETag", etag);
@ -4387,13 +4372,13 @@ bool S3fsCurl::CopyMultipartPostComplete()
return true;
}
bool S3fsCurl::MixMultipartPostComplete()
bool S3fsCurl::MixMultipartUploadComplete()
{
bool result;
if(-1 == partdata.fd){
result = CopyMultipartPostComplete();
result = CopyMultipartUploadComplete();
}else{
result = UploadMultipartPostComplete();
result = MultipartUploadPartComplete();
}
return result;
}
@ -4408,15 +4393,15 @@ int S3fsCurl::MultipartHeadRequest(const char* tpath, off_t size, headers_t& met
S3FS_PRN_INFO3("[tpath=%s]", SAFESTRPTR(tpath));
if(0 != (result = PreMultipartPostRequest(tpath, meta, upload_id))){
if(0 != (result = PreMultipartUploadRequest(tpath, meta, upload_id))){
return result;
}
DestroyCurlHandle();
// Initialize S3fsMultiCurl
S3fsMultiCurl curlmulti(GetMaxParallelCount());
curlmulti.SetSuccessCallback(S3fsCurl::CopyMultipartPostCallback);
curlmulti.SetRetryCallback(S3fsCurl::CopyMultipartPostRetryCallback);
curlmulti.SetSuccessCallback(S3fsCurl::CopyMultipartUploadCallback);
curlmulti.SetRetryCallback(S3fsCurl::CopyMultipartUploadRetryCallback);
for(bytes_remaining = size; 0 < bytes_remaining; bytes_remaining -= chunk){
chunk = bytes_remaining > GetMultipartCopySize() ? GetMultipartCopySize() : bytes_remaining;
@ -4432,7 +4417,7 @@ int S3fsCurl::MultipartHeadRequest(const char* tpath, off_t size, headers_t& met
s3fscurl_para->partdata.add_etag_list(list);
// initiate upload part for parallel
if(0 != (result = s3fscurl_para->CopyMultipartPostSetup(tpath, tpath, s3fscurl_para->partdata.get_part_number(), upload_id, meta))){
if(0 != (result = s3fscurl_para->CopyMultipartUploadSetup(tpath, tpath, s3fscurl_para->partdata.get_part_number(), upload_id, meta))){
S3FS_PRN_ERR("failed uploading part setup(%d)", result);
return result;
}
@ -4447,17 +4432,14 @@ int S3fsCurl::MultipartHeadRequest(const char* tpath, off_t size, headers_t& met
// Multi request
if(0 != (result = curlmulti.Request())){
S3FS_PRN_ERR("error occurred in multi request(errno=%d).", result);
S3fsCurl s3fscurl_abort(true);
int result2 = s3fscurl_abort.AbortMultipartUpload(tpath, upload_id);
s3fscurl_abort.DestroyCurlHandle();
if(result2 != 0){
int result2;
if(0 != (result2 = abort_multipart_upload_request(std::string(tpath), upload_id))){
S3FS_PRN_ERR("error aborting multipart upload(errno=%d).", result2);
}
return result;
}
if(0 != (result = CompleteMultipartPostRequest(tpath, upload_id, list))){
if(0 != (result = MultipartUploadComplete(tpath, upload_id, list))){
return result;
}
return 0;
@ -4477,7 +4459,7 @@ int S3fsCurl::MultipartUploadRequest(const std::string& upload_id, const char* t
// upload part
int result;
if(0 != (result = UploadMultipartPostRequest(tpath, petagpair->part_num, upload_id))){
if(0 != (result = MultipartUploadPartRequest(tpath, petagpair->part_num, upload_id))){
S3FS_PRN_ERR("failed uploading %d part by error(%d)", petagpair->part_num, result);
return result;
}
@ -4503,15 +4485,15 @@ int S3fsCurl::MultipartRenameRequest(const char* from, const char* to, headers_t
meta["Content-Type"] = S3fsCurl::LookupMimeType(to);
meta["x-amz-copy-source"] = srcresource;
if(0 != (result = PreMultipartPostRequest(to, meta, upload_id))){
if(0 != (result = PreMultipartUploadRequest(to, meta, upload_id))){
return result;
}
DestroyCurlHandle();
// Initialize S3fsMultiCurl
S3fsMultiCurl curlmulti(GetMaxParallelCount());
curlmulti.SetSuccessCallback(S3fsCurl::CopyMultipartPostCallback);
curlmulti.SetRetryCallback(S3fsCurl::CopyMultipartPostRetryCallback);
curlmulti.SetSuccessCallback(S3fsCurl::CopyMultipartUploadCallback);
curlmulti.SetRetryCallback(S3fsCurl::CopyMultipartUploadRetryCallback);
for(bytes_remaining = size; 0 < bytes_remaining; bytes_remaining -= chunk){
chunk = bytes_remaining > GetMultipartCopySize() ? GetMultipartCopySize() : bytes_remaining;
@ -4527,7 +4509,7 @@ int S3fsCurl::MultipartRenameRequest(const char* from, const char* to, headers_t
s3fscurl_para->partdata.add_etag_list(list);
// initiate upload part for parallel
if(0 != (result = s3fscurl_para->CopyMultipartPostSetup(from, to, s3fscurl_para->partdata.get_part_number(), upload_id, meta))){
if(0 != (result = s3fscurl_para->CopyMultipartUploadSetup(from, to, s3fscurl_para->partdata.get_part_number(), upload_id, meta))){
S3FS_PRN_ERR("failed uploading part setup(%d)", result);
return result;
}
@ -4542,17 +4524,14 @@ int S3fsCurl::MultipartRenameRequest(const char* from, const char* to, headers_t
// Multi request
if(0 != (result = curlmulti.Request())){
S3FS_PRN_ERR("error occurred in multi request(errno=%d).", result);
S3fsCurl s3fscurl_abort(true);
int result2 = s3fscurl_abort.AbortMultipartUpload(to, upload_id);
s3fscurl_abort.DestroyCurlHandle();
if(result2 != 0){
int result2;
if(0 != (result2 = abort_multipart_upload_request(std::string(to), upload_id))){
S3FS_PRN_ERR("error aborting multipart upload(errno=%d).", result2);
}
return result;
}
if(0 != (result = CompleteMultipartPostRequest(to, upload_id, list))){
if(0 != (result = MultipartUploadComplete(to, upload_id, list))){
return result;
}
return 0;

View File

@ -233,7 +233,6 @@ class S3fsCurl
static int CurlProgress(void *clientp, double dltotal, double dlnow, double ultotal, double ulnow);
static std::string extractURI(const std::string& url);
static bool LocateBundle();
static size_t HeaderCallback(void *data, size_t blockSize, size_t numBlocks, void *userPtr);
static size_t WriteMemoryCallback(void *ptr, size_t blockSize, size_t numBlocks, void *data);
@ -241,16 +240,17 @@ class S3fsCurl
static size_t UploadReadCallback(void *ptr, size_t size, size_t nmemb, void *userp);
static size_t DownloadWriteCallback(void* ptr, size_t size, size_t nmemb, void* userp);
static bool UploadMultipartPostCallback(S3fsCurl* s3fscurl, void* param);
static bool CopyMultipartPostCallback(S3fsCurl* s3fscurl, void* param);
static bool MixMultipartPostCallback(S3fsCurl* s3fscurl, void* param);
static std::unique_ptr<S3fsCurl> UploadMultipartPostRetryCallback(S3fsCurl* s3fscurl);
static std::unique_ptr<S3fsCurl> CopyMultipartPostRetryCallback(S3fsCurl* s3fscurl);
static std::unique_ptr<S3fsCurl> MixMultipartPostRetryCallback(S3fsCurl* s3fscurl);
static bool MultipartUploadPartCallback(S3fsCurl* s3fscurl, void* param);
static bool CopyMultipartUploadCallback(S3fsCurl* s3fscurl, void* param);
static bool MixMultipartUploadCallback(S3fsCurl* s3fscurl, void* param);
static std::unique_ptr<S3fsCurl> MultipartUploadPartRetryCallback(S3fsCurl* s3fscurl);
static std::unique_ptr<S3fsCurl> CopyMultipartUploadRetryCallback(S3fsCurl* s3fscurl);
static std::unique_ptr<S3fsCurl> MixMultipartUploadRetryCallback(S3fsCurl* s3fscurl);
static std::unique_ptr<S3fsCurl> ParallelGetObjectRetryCallback(S3fsCurl* s3fscurl);
// lazy functions for set curl options
static bool CopyMultipartPostSetCurlOpts(S3fsCurl* s3fscurl);
static bool MultipartUploadPartSetCurlOpts(S3fsCurl* s3fscurl);
static bool CopyMultipartUploadSetCurlOpts(S3fsCurl* s3fscurl);
static bool PreGetObjectRequestSetCurlOpts(S3fsCurl* s3fscurl);
static bool PreHeadRequestSetCurlOpts(S3fsCurl* s3fscurl);
@ -275,10 +275,10 @@ class S3fsCurl
bool AddSseRequestHead(sse_type_t ssetype, const std::string& ssevalue, bool is_copy);
std::string CalcSignatureV2(const std::string& method, const std::string& strMD5, const std::string& content_type, const std::string& date, const std::string& resource, const std::string& secret_access_key, const std::string& access_token);
std::string CalcSignature(const std::string& method, const std::string& canonical_uri, const std::string& query_string, const std::string& strdate, const std::string& payload_hash, const std::string& date8601, const std::string& secret_access_key, const std::string& access_token);
int UploadMultipartPostSetup(const char* tpath, int part_num, const std::string& upload_id);
int CopyMultipartPostSetup(const char* from, const char* to, int part_num, const std::string& upload_id, headers_t& meta);
bool UploadMultipartPostComplete();
bool CopyMultipartPostComplete();
int MultipartUploadPartSetup(const char* tpath, int part_num, const std::string& upload_id);
int CopyMultipartUploadSetup(const char* from, const char* to, int part_num, const std::string& upload_id, headers_t& meta);
bool MultipartUploadPartComplete();
bool CopyMultipartUploadComplete();
int MapPutErrorResponse(int result);
public:
@ -288,13 +288,10 @@ class S3fsCurl
static bool InitMimeType(const std::string& strFile);
static bool DestroyS3fsCurl();
static std::unique_ptr<S3fsCurl> CreateParallelS3fsCurl(const char* tpath, int fd, off_t start, off_t size, int part_num, bool is_copy, etagpair* petag, const std::string& upload_id, int& result);
static int ParallelMultipartUploadRequest(const char* tpath, headers_t& meta, int fd);
static int ParallelMultipartUploadRequest(const char* tpath, const headers_t& meta, int fd);
static int ParallelMixMultipartUploadRequest(const char* tpath, headers_t& meta, int fd, const fdpage_list_t& mixuppages);
static int ParallelGetObjectRequest(const char* tpath, int fd, off_t start, off_t size);
// lazy functions for set curl options(public)
static bool UploadMultipartPostSetCurlOpts(S3fsCurl* s3fscurl);
// class methods(variables)
static std::string LookupMimeType(const std::string& name);
static bool SetCheckCertificate(bool isCertCheck);
@ -375,16 +372,16 @@ class S3fsCurl
return PreHeadRequest(tpath.c_str(), bpath.c_str(), savedpath.c_str(), ssekey_pos);
}
int HeadRequest(const char* tpath, headers_t& meta);
int PutHeadRequest(const char* tpath, headers_t& meta, bool is_copy);
int PutHeadRequest(const char* tpath, const headers_t& meta, bool is_copy);
int PutRequest(const char* tpath, headers_t& meta, int fd);
int PreGetObjectRequest(const char* tpath, int fd, off_t start, off_t size, sse_type_t ssetype, const std::string& ssevalue);
int GetObjectRequest(const char* tpath, int fd, off_t start = -1, off_t size = -1);
int GetObjectRequest(const char* tpath, int fd, off_t start, off_t size, sse_type_t ssetype, const std::string& ssevalue);
int CheckBucket(const char* check_path, bool compat_dir, bool force_no_sse);
int ListBucketRequest(const char* tpath, const char* query);
int PreMultipartPostRequest(const char* tpath, headers_t& meta, std::string& upload_id);
int CompleteMultipartPostRequest(const char* tpath, const std::string& upload_id, const etaglist_t& parts);
int UploadMultipartPostRequest(const char* tpath, int part_num, const std::string& upload_id);
bool MixMultipartPostComplete();
int PreMultipartUploadRequest(const char* tpath, const headers_t& meta, std::string& upload_id);
int MultipartUploadComplete(const char* tpath, const std::string& upload_id, const etaglist_t& parts);
int MultipartUploadPartRequest(const char* tpath, int part_num, const std::string& upload_id);
bool MixMultipartUploadComplete();
int MultipartListRequest(std::string& body);
int AbortMultipartUpload(const char* tpath, const std::string& upload_id);
int MultipartHeadRequest(const char* tpath, off_t size, headers_t& meta);

View File

@ -40,13 +40,35 @@
#include "s3fs_logger.h"
#include "s3fs_util.h"
#include "curl.h"
#include "curl_util.h"
#include "s3fs_cred.h"
#include "threadpoolman.h"
#include "s3fs_threadreqs.h"
//------------------------------------------------
// Symbols
//------------------------------------------------
static constexpr int MAX_MULTIPART_CNT = 10 * 1000; // S3 multipart max count
//------------------------------------------------
// Structure of parameters to pass to thread
//------------------------------------------------
//
// Multipart Upload Request parameter structure for Thread Pool.
//
// ([TODO] This is a temporary structure is moved when S3fsMultiCurl is deprecated.)
//
struct multipart_upload_req_thparam
{
std::string path;
std::string upload_id;
int fd = -1;
off_t start = 0;
off_t size = 0;
etagpair* petagpair = nullptr;
int result = 0;
};
//------------------------------------------------
// FdEntity class variables
//------------------------------------------------
@ -106,6 +128,25 @@ ino_t FdEntity::GetInode(int fd)
return st.st_ino;
}
//
// Worker function for multipart upload request
//
// ([TODO] This is a temporary structure is moved when S3fsMultiCurl is deprecated.)
//
void* FdEntity::MultipartUploadThreadWorker(void* arg)
{
auto* pthparam = static_cast<multipart_upload_req_thparam*>(arg);
if(!pthparam){
return reinterpret_cast<void*>(-EIO);
}
S3FS_PRN_INFO3("Multipart Upload Request [path=%s][upload id=%s][fd=%d][start=%lld][size=%lld][etagpair=%p]", pthparam->path.c_str(), pthparam->upload_id.c_str(), pthparam->fd, static_cast<long long>(pthparam->start), static_cast<long long>(pthparam->size), pthparam->petagpair);
S3fsCurl s3fscurl(true);
pthparam->result = s3fscurl.MultipartUploadRequest(pthparam->upload_id, pthparam->path.c_str(), pthparam->fd, pthparam->start, pthparam->size, pthparam->petagpair);
return reinterpret_cast<void*>(pthparam->result);
}
//------------------------------------------------
// FdEntity methods
//------------------------------------------------
@ -1039,8 +1080,7 @@ int FdEntity::Load(off_t start, off_t size, bool is_modified_flag)
}else{
// single request
if(0 < need_load_size){
S3fsCurl s3fscurl;
result = s3fscurl.GetObjectRequest(path.c_str(), physical_fd, iter->offset, need_load_size);
result = get_object_request(path, physical_fd, iter->offset, need_load_size);
}else{
result = 0;
}
@ -1164,8 +1204,7 @@ int FdEntity::NoCacheLoadAndPost(PseudoFdInfo* pseudo_obj, off_t start, off_t si
// single area get request
if(0 < need_load_size){
S3fsCurl s3fscurl;
if(0 != (result = s3fscurl.GetObjectRequest(path.c_str(), tmpfd, offset, oneread))){
if(0 != (result = get_object_request(path, tmpfd, offset, oneread))){
S3FS_PRN_ERR("failed to get object(start=%lld, size=%lld) for file(physical_fd=%d).", static_cast<long long int>(offset), static_cast<long long int>(oneread), tmpfd);
break;
}
@ -1180,9 +1219,9 @@ int FdEntity::NoCacheLoadAndPost(PseudoFdInfo* pseudo_obj, off_t start, off_t si
}else{
// already loaded area
}
// single area upload by multipart post
if(0 != (result = NoCacheMultipartPost(pseudo_obj, upload_fd, offset, oneread))){
S3FS_PRN_ERR("failed to multipart post(start=%lld, size=%lld) for file(physical_fd=%d).", static_cast<long long int>(offset), static_cast<long long int>(oneread), upload_fd);
// single area upload by multipart upload
if(0 != (result = NoCacheMultipartUploadRequest(pseudo_obj, upload_fd, offset, oneread))){
S3FS_PRN_ERR("failed to multipart upload(start=%lld, size=%lld) for file(physical_fd=%d).", static_cast<long long int>(offset), static_cast<long long int>(oneread), upload_fd);
break;
}
}
@ -1222,11 +1261,43 @@ int FdEntity::NoCacheLoadAndPost(PseudoFdInfo* pseudo_obj, off_t start, off_t si
return result;
}
//
// Common method that calls S3fsCurl::PreMultipartUploadRequest via pre_multipart_upload_request
//
// [NOTE]
// If the request is successful, initialize upload_id.
//
int FdEntity::PreMultipartUploadRequest(PseudoFdInfo* pseudo_obj)
{
if(!pseudo_obj){
S3FS_PRN_ERR("Internal error, pseudo fd object pointer is null.");
return -EIO;
}
// get upload_id
std::string upload_id;
int result;
if(0 != (result = pre_multipart_upload_request(path, orgmeta, upload_id))){
return result;
}
// reset upload_id
if(!pseudo_obj->InitialUploadInfo(upload_id)){
S3FS_PRN_ERR("failed to initialize upload id(%s)", upload_id.c_str());
return -EIO;
}
// Clear the dirty flag, because the meta data is updated.
pending_status = pending_status_t::NO_UPDATE_PENDING;
return 0;
}
// [NOTE]
// At no disk space for caching object.
// This method is starting multipart uploading.
//
int FdEntity::NoCachePreMultipartPost(PseudoFdInfo* pseudo_obj)
int FdEntity::NoCachePreMultipartUploadRequest(PseudoFdInfo* pseudo_obj)
{
if(!pseudo_obj){
S3FS_PRN_ERR("Internal error, pseudo fd object pointer is null.");
@ -1236,21 +1307,11 @@ int FdEntity::NoCachePreMultipartPost(PseudoFdInfo* pseudo_obj)
// initialize multipart upload values
pseudo_obj->ClearUploadInfo(true);
S3fsCurl s3fscurl(true);
std::string upload_id;
int result;
if(0 != (result = s3fscurl.PreMultipartPostRequest(path.c_str(), orgmeta, upload_id))){
int result;
if(0 != (result = PreMultipartUploadRequest(pseudo_obj))){
return result;
}
s3fscurl.DestroyCurlHandle();
// Clear the dirty flag, because the meta data is updated.
pending_status = pending_status_t::NO_UPDATE_PENDING;
// reset upload_id
if(!pseudo_obj->InitialUploadInfo(upload_id)){
return -EIO;
}
return 0;
}
@ -1258,55 +1319,74 @@ int FdEntity::NoCachePreMultipartPost(PseudoFdInfo* pseudo_obj)
// At no disk space for caching object.
// This method is uploading one part of multipart.
//
int FdEntity::NoCacheMultipartPost(PseudoFdInfo* pseudo_obj, int tgfd, off_t start, off_t size)
// ([TODO] This is a temporary modification till S3fsMultiCurl is deprecated.)
//
int FdEntity::NoCacheMultipartUploadRequest(PseudoFdInfo* pseudo_obj, int tgfd, off_t start, off_t size)
{
if(-1 == tgfd || !pseudo_obj || !pseudo_obj->IsUploading()){
S3FS_PRN_ERR("Need to initialize for multipart post.");
S3FS_PRN_ERR("Need to initialize for multipart upload.");
return -EIO;
}
// parameter for thread worker
multipart_upload_req_thparam thargs;
thargs.path = path;
thargs.upload_id.clear();
thargs.fd = tgfd;
thargs.start = start;
thargs.size = size;
thargs.petagpair = nullptr;
thargs.result = 0;
// get upload id
std::string upload_id;
if(!pseudo_obj->GetUploadId(upload_id)){
if(!pseudo_obj->GetUploadId(thargs.upload_id)){
return -EIO;
}
// append new part and get it's etag string pointer
etagpair* petagpair = nullptr;
if(!pseudo_obj->AppendUploadPart(start, size, false, &petagpair)){
if(!pseudo_obj->AppendUploadPart(start, size, false, &(thargs.petagpair))){
return -EIO;
}
S3fsCurl s3fscurl(true);
return s3fscurl.MultipartUploadRequest(upload_id, path.c_str(), tgfd, start, size, petagpair);
// make parameter for thread pool
thpoolman_param ppoolparam;
ppoolparam.args = &thargs;
ppoolparam.psem = nullptr; // case await
ppoolparam.pfunc = FdEntity::MultipartUploadThreadWorker;
// send request by thread
if(!ThreadPoolMan::AwaitInstruct(ppoolparam)){
S3FS_PRN_ERR("failed to setup Get Object Request Thread Worker");
return -EIO;
}
if(0 != thargs.result){
S3FS_PRN_ERR("Multipart Upload Request(path=%s, upload_id=%s, fd=%d, start=%lld, size=%lld) returns with error(%d)", path.c_str(), thargs.upload_id.c_str(), tgfd, static_cast<long long int>(start), static_cast<long long int>(size), thargs.result);
return thargs.result;
}
return 0;
}
// [NOTE]
// At no disk space for caching object.
// This method is finishing multipart uploading.
//
int FdEntity::NoCacheCompleteMultipartPost(PseudoFdInfo* pseudo_obj)
int FdEntity::NoCacheMultipartUploadComplete(PseudoFdInfo* pseudo_obj)
{
etaglist_t etaglist;
if(!pseudo_obj || !pseudo_obj->IsUploading() || !pseudo_obj->GetEtaglist(etaglist)){
S3FS_PRN_ERR("There is no upload id or etag list.");
return -EIO;
}
// get upload id
// get upload id and etag list
std::string upload_id;
if(!pseudo_obj->GetUploadId(upload_id)){
etaglist_t parts;
if(!pseudo_obj->GetUploadId(upload_id) || !pseudo_obj->GetEtaglist(parts)){
return -EIO;
}
S3fsCurl s3fscurl(true);
int result = s3fscurl.CompleteMultipartPostRequest(path.c_str(), upload_id, etaglist);
s3fscurl.DestroyCurlHandle();
if(0 != result){
S3fsCurl s3fscurl_abort(true);
int result2 = s3fscurl.AbortMultipartUpload(path.c_str(), upload_id);
s3fscurl_abort.DestroyCurlHandle();
if(0 != result2){
int result;
if(0 != (result = complete_multipart_upload_request(path, upload_id, parts))){
S3FS_PRN_ERR("failed to complete multipart upload by errno(%d)", result);
untreated_list.ClearAll();
pseudo_obj->ClearUploadInfo(); // clear multipart upload info
int result2;
if(0 != (result2 = abort_multipart_upload_request(path, upload_id))){
S3FS_PRN_ERR("failed to abort multipart upload by errno(%d)", result2);
}
return result;
@ -1398,6 +1478,9 @@ int FdEntity::RowFlushHasLock(int fd, const char* tpath, bool force_sync)
return result;
}
//
// ([TODO] This is a temporary modification till S3fsMultiCurl is deprecated.)
//
int FdEntity::RowFlushNoMultipart(const PseudoFdInfo* pseudo_obj, const char* tpath)
{
S3FS_PRN_INFO3("[tpath=%s][path=%s][pseudo_fd=%d][physical_fd=%d]", SAFESTRPTR(tpath), path.c_str(), (pseudo_obj ? pseudo_obj->GetPseudoFd() : -1), physical_fd);
@ -1411,23 +1494,20 @@ int FdEntity::RowFlushNoMultipart(const PseudoFdInfo* pseudo_obj, const char* tp
return -EBADF;
}
int result;
std::string tmppath = path;
headers_t tmporgmeta = orgmeta;
// If there is no loading all of the area, loading all area.
off_t restsize = pagelist.GetTotalUnloadedPageSize();
if(0 < restsize){
// check disk space
if(!ReserveDiskSpace(restsize)){
// no enough disk space
S3FS_PRN_WARN("Not enough local storage to flush: [path=%s][pseudo_fd=%d][physical_fd=%d]", path.c_str(), pseudo_obj->GetPseudoFd(), physical_fd);
S3FS_PRN_WARN("Not enough local storage to flush: [path=%s][pseudo_fd=%d][physical_fd=%d]", (tpath ? tpath : path.c_str()), pseudo_obj->GetPseudoFd(), physical_fd);
return -ENOSPC; // No space left on device
}
}
FdManager::FreeReservedDiskSpace(restsize);
// Always load all uninitialized area
int result;
if(0 != (result = Load(/*start=*/ 0, /*size=*/ 0))){
S3FS_PRN_ERR("failed to upload all area(errno=%d)", result);
return result;
@ -1445,21 +1525,45 @@ int FdEntity::RowFlushNoMultipart(const PseudoFdInfo* pseudo_obj, const char* tp
S3FS_PRN_ERR("fstat is failed by errno(%d), but continue...", errno);
}
S3fsCurl s3fscurl(true);
result = s3fscurl.PutRequest(tpath ? tpath : tmppath.c_str(), tmporgmeta, physical_fd);
// parameter for thread worker
put_req_thparam thargs;
thargs.path = tpath ? tpath : path;
thargs.meta = orgmeta; // copy
thargs.fd = physical_fd;
thargs.ahbe = true;
thargs.result = 0;
// make parameter for thread pool
thpoolman_param ppoolparam;
ppoolparam.args = &thargs;
ppoolparam.psem = nullptr; // case await
ppoolparam.pfunc = put_req_threadworker;
// send request by thread
if(!ThreadPoolMan::AwaitInstruct(ppoolparam)){
S3FS_PRN_ERR("failed to setup Put Request for Thread Worker");
return -EIO;
}
if(0 != thargs.result){
// continue...
S3FS_PRN_DBG("Put Request(%s) returns with errno(%d)", thargs.path.c_str(), thargs.result);
}
// reset uploaded file size
size_orgmeta = st.st_size;
untreated_list.ClearAll();
if(0 == result){
if(0 == thargs.result){
pagelist.ClearAllModified();
}
return result;
return thargs.result;
}
//
// ([TODO] This is a temporary modification till S3fsMultiCurl is deprecated.)
//
int FdEntity::RowFlushMultipart(PseudoFdInfo* pseudo_obj, const char* tpath)
{
S3FS_PRN_INFO3("[tpath=%s][path=%s][pseudo_fd=%d][physical_fd=%d]", SAFESTRPTR(tpath), path.c_str(), (pseudo_obj ? pseudo_obj->GetPseudoFd() : -1), physical_fd);
@ -1479,7 +1583,7 @@ int FdEntity::RowFlushMultipart(PseudoFdInfo* pseudo_obj, const char* tpath)
// Check rest size and free disk space
if(0 < restsize && !ReserveDiskSpace(restsize)){
// no enough disk space
if(0 != (result = NoCachePreMultipartPost(pseudo_obj))){
if(0 != (result = NoCachePreMultipartUploadRequest(pseudo_obj))){
S3FS_PRN_ERR("failed to switch multipart uploading with no cache(errno=%d)", result);
return result;
}
@ -1518,8 +1622,31 @@ int FdEntity::RowFlushMultipart(PseudoFdInfo* pseudo_obj, const char* tpath)
}else{
// normal uploading (too small part size)
S3fsCurl s3fscurl(true);
result = s3fscurl.PutRequest(tpath ? tpath : tmppath.c_str(), tmporgmeta, physical_fd);
// parameter for thread worker
put_req_thparam thargs;
thargs.path = tpath ? tpath : tmppath;
thargs.meta = tmporgmeta; // copy
thargs.fd = physical_fd;
thargs.ahbe = true;
thargs.result = 0;
// make parameter for thread pool
thpoolman_param ppoolparam;
ppoolparam.args = &thargs;
ppoolparam.psem = nullptr; // case await
ppoolparam.pfunc = put_req_threadworker;
// send request by thread
if(!ThreadPoolMan::AwaitInstruct(ppoolparam)){
S3FS_PRN_ERR("failed to setup Put Request for Thread Worker");
return -EIO;
}
if(0 != thargs.result){
// continue...
S3FS_PRN_DBG("Put Request(%s) returns with errno(%d)", (tpath ? tpath : tmppath.c_str()), thargs.result);
}
result = thargs.result;
}
// reset uploaded file size
@ -1534,15 +1661,15 @@ int FdEntity::RowFlushMultipart(PseudoFdInfo* pseudo_obj, const char* tpath)
off_t untreated_start = 0;
off_t untreated_size = 0;
if(untreated_list.GetLastUpdatedPart(untreated_start, untreated_size, S3fsCurl::GetMultipartSize(), 0) && 0 < untreated_size){
if(0 != (result = NoCacheMultipartPost(pseudo_obj, physical_fd, untreated_start, untreated_size))){
S3FS_PRN_ERR("failed to multipart post(start=%lld, size=%lld) for file(physical_fd=%d).", static_cast<long long int>(untreated_start), static_cast<long long int>(untreated_size), physical_fd);
if(0 != (result = NoCacheMultipartUploadRequest(pseudo_obj, physical_fd, untreated_start, untreated_size))){
S3FS_PRN_ERR("failed to multipart upload(start=%lld, size=%lld) for file(physical_fd=%d).", static_cast<long long int>(untreated_start), static_cast<long long int>(untreated_size), physical_fd);
return result;
}
untreated_list.ClearParts(untreated_start, untreated_size);
}
// complete multipart uploading.
if(0 != (result = NoCacheCompleteMultipartPost(pseudo_obj))){
S3FS_PRN_ERR("failed to complete(finish) multipart post for file(physical_fd=%d).", physical_fd);
if(0 != (result = NoCacheMultipartUploadComplete(pseudo_obj))){
S3FS_PRN_ERR("failed to complete(finish) multipart upload for file(physical_fd=%d).", physical_fd);
return result;
}
// truncate file to zero
@ -1563,6 +1690,9 @@ int FdEntity::RowFlushMultipart(PseudoFdInfo* pseudo_obj, const char* tpath)
return result;
}
//
// ([TODO] This is a temporary modification till S3fsMultiCurl is deprecated.)
//
int FdEntity::RowFlushMixMultipart(PseudoFdInfo* pseudo_obj, const char* tpath)
{
S3FS_PRN_INFO3("[tpath=%s][path=%s][pseudo_fd=%d][physical_fd=%d]", SAFESTRPTR(tpath), path.c_str(), (pseudo_obj ? pseudo_obj->GetPseudoFd() : -1), physical_fd);
@ -1582,7 +1712,7 @@ int FdEntity::RowFlushMixMultipart(PseudoFdInfo* pseudo_obj, const char* tpath)
// Check rest size and free disk space
if(0 < restsize && !ReserveDiskSpace(restsize)){
// no enough disk space
if(0 != (result = NoCachePreMultipartPost(pseudo_obj))){
if(0 != (result = NoCachePreMultipartUploadRequest(pseudo_obj))){
S3FS_PRN_ERR("failed to switch multipart uploading with no cache(errno=%d)", result);
return result;
}
@ -1642,8 +1772,30 @@ int FdEntity::RowFlushMixMultipart(PseudoFdInfo* pseudo_obj, const char* tpath)
return result;
}
S3fsCurl s3fscurl(true);
result = s3fscurl.PutRequest(tpath ? tpath : tmppath.c_str(), tmporgmeta, physical_fd);
// parameter for thread worker
put_req_thparam thargs;
thargs.path = tpath ? tpath : tmppath;
thargs.meta = tmporgmeta; // copy
thargs.fd = physical_fd;
thargs.ahbe = true;
thargs.result = 0;
// make parameter for thread pool
thpoolman_param ppoolparam;
ppoolparam.args = &thargs;
ppoolparam.psem = nullptr; // case await
ppoolparam.pfunc = put_req_threadworker;
// send request by thread
if(!ThreadPoolMan::AwaitInstruct(ppoolparam)){
S3FS_PRN_ERR("failed to setup Put Request for Thread Worker");
return -EIO;
}
if(0 != thargs.result){
// continue...
S3FS_PRN_DBG("Put Request(%s) returns with errno(%d)", (tpath ? tpath : tmppath.c_str()), thargs.result);
}
result = thargs.result;
}
// reset uploaded file size
@ -1658,15 +1810,15 @@ int FdEntity::RowFlushMixMultipart(PseudoFdInfo* pseudo_obj, const char* tpath)
off_t untreated_start = 0;
off_t untreated_size = 0;
if(untreated_list.GetLastUpdatedPart(untreated_start, untreated_size, S3fsCurl::GetMultipartSize(), 0) && 0 < untreated_size){
if(0 != (result = NoCacheMultipartPost(pseudo_obj, physical_fd, untreated_start, untreated_size))){
S3FS_PRN_ERR("failed to multipart post(start=%lld, size=%lld) for file(physical_fd=%d).", static_cast<long long int>(untreated_start), static_cast<long long int>(untreated_size), physical_fd);
if(0 != (result = NoCacheMultipartUploadRequest(pseudo_obj, physical_fd, untreated_start, untreated_size))){
S3FS_PRN_ERR("failed to multipart upload(start=%lld, size=%lld) for file(physical_fd=%d).", static_cast<long long int>(untreated_start), static_cast<long long int>(untreated_size), physical_fd);
return result;
}
untreated_list.ClearParts(untreated_start, untreated_size);
}
// complete multipart uploading.
if(0 != (result = NoCacheCompleteMultipartPost(pseudo_obj))){
S3FS_PRN_ERR("failed to complete(finish) multipart post for file(physical_fd=%d).", physical_fd);
if(0 != (result = NoCacheMultipartUploadComplete(pseudo_obj))){
S3FS_PRN_ERR("failed to complete(finish) multipart upload for file(physical_fd=%d).", physical_fd);
return result;
}
// truncate file to zero
@ -1687,6 +1839,9 @@ int FdEntity::RowFlushMixMultipart(PseudoFdInfo* pseudo_obj, const char* tpath)
return result;
}
//
// ([TODO] This is a temporary modification till S3fsMultiCurl is deprecated.)
//
int FdEntity::RowFlushStreamMultipart(PseudoFdInfo* pseudo_obj, const char* tpath)
{
S3FS_PRN_INFO3("[tpath=%s][path=%s][pseudo_fd=%d][physical_fd=%d][mix_upload=%s]", SAFESTRPTR(tpath), path.c_str(), (pseudo_obj ? pseudo_obj->GetPseudoFd() : -1), physical_fd, (FdEntity::mixmultipart ? "true" : "false"));
@ -1713,9 +1868,30 @@ int FdEntity::RowFlushStreamMultipart(PseudoFdInfo* pseudo_obj, const char* tpat
return result;
}
headers_t tmporgmeta = orgmeta;
S3fsCurl s3fscurl(true);
result = s3fscurl.PutRequest(path.c_str(), tmporgmeta, physical_fd);
// parameter for thread worker
put_req_thparam thargs;
thargs.path = path;
thargs.meta = orgmeta; // copy
thargs.fd = physical_fd;
thargs.ahbe = true;
thargs.result = 0;
// make parameter for thread pool
thpoolman_param ppoolparam;
ppoolparam.args = &thargs;
ppoolparam.psem = nullptr; // case await
ppoolparam.pfunc = put_req_threadworker;
// send request by thread
if(!ThreadPoolMan::AwaitInstruct(ppoolparam)){
S3FS_PRN_ERR("failed to setup Put Request for Thread Worker");
return -EIO;
}
if(0 != thargs.result){
// continue...
S3FS_PRN_DBG("Put Request(%s) returns with errno(%d)", path.c_str(), thargs.result);
}
result = thargs.result;
// reset uploaded file size
size_orgmeta = st.st_size;
@ -1785,19 +1961,9 @@ int FdEntity::RowFlushStreamMultipart(PseudoFdInfo* pseudo_obj, const char* tpat
//
// Multipart uploading hasn't started yet, so start it.
//
S3fsCurl s3fscurl(true);
std::string upload_id;
if(0 != (result = s3fscurl.PreMultipartPostRequest(path.c_str(), orgmeta, upload_id))){
S3FS_PRN_ERR("failed to setup multipart upload(create upload id) by errno(%d)", result);
if(0 != (result = PreMultipartUploadRequest(pseudo_obj))){
return result;
}
if(!pseudo_obj->InitialUploadInfo(upload_id)){
S3FS_PRN_ERR("failed to setup multipart upload(set upload id to object)");
return -EIO;
}
// Clear the dirty flag, because the meta data is updated.
pending_status = pending_status_t::NO_UPDATE_PENDING;
}
//
@ -1851,30 +2017,26 @@ int FdEntity::RowFlushStreamMultipart(PseudoFdInfo* pseudo_obj, const char* tpat
// Complete uploading
//
std::string upload_id;
etaglist_t etaglist;
if(!pseudo_obj->GetUploadId(upload_id) || !pseudo_obj->GetEtaglist(etaglist)){
etaglist_t parts;
if(!pseudo_obj->GetUploadId(upload_id) || !pseudo_obj->GetEtaglist(parts)){
S3FS_PRN_ERR("There is no upload id or etag list.");
untreated_list.ClearAll();
pseudo_obj->ClearUploadInfo(); // clear multipart upload info
return -EIO;
}else{
S3fsCurl s3fscurl(true);
result = s3fscurl.CompleteMultipartPostRequest(path.c_str(), upload_id, etaglist);
s3fscurl.DestroyCurlHandle();
if(0 != result){
if(0 != (result = complete_multipart_upload_request(path, upload_id, parts))){
S3FS_PRN_ERR("failed to complete multipart upload by errno(%d)", result);
untreated_list.ClearAll();
pseudo_obj->ClearUploadInfo(); // clear multipart upload info
S3fsCurl s3fscurl_abort(true);
int result2 = s3fscurl.AbortMultipartUpload(path.c_str(), upload_id);
s3fscurl_abort.DestroyCurlHandle();
if(0 != result2){
int result2;
if(0 != (result2 = abort_multipart_upload_request(path, upload_id))){
S3FS_PRN_ERR("failed to abort multipart upload by errno(%d)", result2);
}
return result;
}
}
untreated_list.ClearAll();
pseudo_obj->ClearUploadInfo(); // clear multipart upload info
@ -2126,7 +2288,7 @@ ssize_t FdEntity::WriteMultipart(PseudoFdInfo* pseudo_obj, const char* bytes, of
S3FS_PRN_WARN("Not enough local storage to cache write request till multipart upload can start: [path=%s][physical_fd=%d][offset=%lld][size=%zu]", path.c_str(), physical_fd, static_cast<long long int>(start), size);
return -ENOSPC; // No space left on device
}
if(0 != (result = NoCachePreMultipartPost(pseudo_obj))){
if(0 != (result = NoCachePreMultipartUploadRequest(pseudo_obj))){
S3FS_PRN_ERR("failed to switch multipart uploading with no cache(errno=%d)", result);
return result;
}
@ -2168,8 +2330,8 @@ ssize_t FdEntity::WriteMultipart(PseudoFdInfo* pseudo_obj, const char* bytes, of
off_t untreated_size = 0;
if(untreated_list.GetLastUpdatedPart(untreated_start, untreated_size, S3fsCurl::GetMultipartSize())){
// when multipart max size is reached
if(0 != (result = NoCacheMultipartPost(pseudo_obj, physical_fd, untreated_start, untreated_size))){
S3FS_PRN_ERR("failed to multipart post(start=%lld, size=%lld) for file(physical_fd=%d).", static_cast<long long int>(untreated_start), static_cast<long long int>(untreated_size), physical_fd);
if(0 != (result = NoCacheMultipartUploadRequest(pseudo_obj, physical_fd, untreated_start, untreated_size))){
S3FS_PRN_ERR("failed to multipart upload(start=%lld, size=%lld) for file(physical_fd=%d).", static_cast<long long int>(untreated_start), static_cast<long long int>(untreated_size), physical_fd);
return result;
}
@ -2210,7 +2372,7 @@ ssize_t FdEntity::WriteMixMultipart(PseudoFdInfo* pseudo_obj, const char* bytes,
S3FS_PRN_WARN("Not enough local storage to cache write request till multipart upload can start: [path=%s][physical_fd=%d][offset=%lld][size=%zu]", path.c_str(), physical_fd, static_cast<long long int>(start), size);
return -ENOSPC; // No space left on device
}
if(0 != (result = NoCachePreMultipartPost(pseudo_obj))){
if(0 != (result = NoCachePreMultipartUploadRequest(pseudo_obj))){
S3FS_PRN_ERR("failed to switch multipart uploading with no cache(errno=%d)", result);
return result;
}
@ -2243,8 +2405,8 @@ ssize_t FdEntity::WriteMixMultipart(PseudoFdInfo* pseudo_obj, const char* bytes,
off_t untreated_size = 0;
if(untreated_list.GetLastUpdatedPart(untreated_start, untreated_size, S3fsCurl::GetMultipartSize())){
// when multipart max size is reached
if(0 != (result = NoCacheMultipartPost(pseudo_obj, physical_fd, untreated_start, untreated_size))){
S3FS_PRN_ERR("failed to multipart post(start=%lld, size=%lld) for file(physical_fd=%d).", static_cast<long long int>(untreated_start), static_cast<long long int>(untreated_size), physical_fd);
if(0 != (result = NoCacheMultipartUploadRequest(pseudo_obj, physical_fd, untreated_start, untreated_size))){
S3FS_PRN_ERR("failed to multipart upload(start=%lld, size=%lld) for file(physical_fd=%d).", static_cast<long long int>(untreated_start), static_cast<long long int>(untreated_size), physical_fd);
return result;
}

View File

@ -80,6 +80,7 @@ class FdEntity : public std::enable_shared_from_this<FdEntity>
private:
static int FillFile(int fd, unsigned char byte, off_t size, off_t start);
static ino_t GetInode(int fd);
static void* MultipartUploadThreadWorker(void* arg); // ([TODO] This is a temporary method is moved when S3fsMultiCurl is deprecated.)
void Clear();
ino_t GetInode() const REQUIRES(FdEntity::fdent_data_lock);
@ -89,9 +90,10 @@ class FdEntity : public std::enable_shared_from_this<FdEntity>
bool IsUploading() REQUIRES(FdEntity::fdent_lock);
bool SetAllStatus(bool is_loaded) REQUIRES(FdEntity::fdent_lock, FdEntity::fdent_data_lock);
bool SetAllStatusUnloaded() REQUIRES(FdEntity::fdent_lock, FdEntity::fdent_data_lock) { return SetAllStatus(false); }
int NoCachePreMultipartPost(PseudoFdInfo* pseudo_obj) REQUIRES(FdEntity::fdent_lock, FdEntity::fdent_data_lock);
int NoCacheMultipartPost(PseudoFdInfo* pseudo_obj, int tgfd, off_t start, off_t size) REQUIRES(FdEntity::fdent_lock);
int NoCacheCompleteMultipartPost(PseudoFdInfo* pseudo_obj) REQUIRES(FdEntity::fdent_lock);
int PreMultipartUploadRequest(PseudoFdInfo* pseudo_obj) REQUIRES(FdEntity::fdent_lock, fdent_data_lock);
int NoCachePreMultipartUploadRequest(PseudoFdInfo* pseudo_obj) REQUIRES(FdEntity::fdent_lock, FdEntity::fdent_data_lock);
int NoCacheMultipartUploadRequest(PseudoFdInfo* pseudo_obj, int tgfd, off_t start, off_t size) REQUIRES(FdEntity::fdent_lock);
int NoCacheMultipartUploadComplete(PseudoFdInfo* pseudo_obj) REQUIRES(FdEntity::fdent_lock);
int RowFlushHasLock(int fd, const char* tpath, bool force_sync) REQUIRES(FdEntity::fdent_lock, FdEntity::fdent_data_lock);
int RowFlushNoMultipart(const PseudoFdInfo* pseudo_obj, const char* tpath) REQUIRES(FdEntity::fdent_lock, FdEntity::fdent_data_lock);
int RowFlushMultipart(PseudoFdInfo* pseudo_obj, const char* tpath) REQUIRES(FdEntity::fdent_lock, FdEntity::fdent_data_lock);

View File

@ -37,16 +37,37 @@
#include "curl.h"
#include "string_util.h"
#include "threadpoolman.h"
#include "s3fs_threadreqs.h"
//------------------------------------------------
// Structure of parameters to pass to thread
//------------------------------------------------
// [NOTE]
// The processing related to this is currently temporarily implemented
// in this file, but will be moved to a separate file at a later.
//
struct pseudofdinfo_mpupload_thparam
{
PseudoFdInfo* ppseudofdinfo = nullptr;
std::string path;
std::string upload_id;
int upload_fd = -1;
off_t start = 0;
off_t size = 0;
bool is_copy = false;
int part_num = -1;
etagpair* petag = nullptr;
};
//------------------------------------------------
// PseudoFdInfo class methods
//------------------------------------------------
//
// Worker function for uploading
// Thread Worker function for uploading
//
void* PseudoFdInfo::MultipartUploadThreadWorker(void* arg)
{
std::unique_ptr<pseudofdinfo_thparam> pthparam(static_cast<pseudofdinfo_thparam*>(arg));
std::unique_ptr<pseudofdinfo_mpupload_thparam> pthparam(static_cast<pseudofdinfo_mpupload_thparam*>(arg));
if(!pthparam || !(pthparam->ppseudofdinfo)){
return reinterpret_cast<void*>(-EIO);
}
@ -82,7 +103,7 @@ void* PseudoFdInfo::MultipartUploadThreadWorker(void* arg)
// Send request and get result
if(0 == (result = s3fscurl->RequestPerform())){
S3FS_PRN_DBG("succeed uploading [path=%s][start=%lld][size=%lld][part=%d]", pthparam->path.c_str(), static_cast<long long>(pthparam->start), static_cast<long long>(pthparam->size), pthparam->part_num);
if(!s3fscurl->MixMultipartPostComplete()){
if(!s3fscurl->MixMultipartUploadComplete()){
S3FS_PRN_ERR("failed completion uploading [path=%s][start=%lld][size=%lld][part=%d]", pthparam->path.c_str(), static_cast<long long>(pthparam->start), static_cast<long long>(pthparam->size), pthparam->part_num);
result = -EIO;
}
@ -448,7 +469,7 @@ bool PseudoFdInfo::ParallelMultipartUpload(const char* path, const mp_part_list_
}
// make parameter for my thread
auto* thargs = new pseudofdinfo_thparam;
auto* thargs = new pseudofdinfo_mpupload_thparam;
thargs->ppseudofdinfo = this;
thargs->path = SAFESTRPTR(path);
thargs->upload_id = tmp_upload_id;
@ -497,6 +518,30 @@ bool PseudoFdInfo::ParallelMultipartUploadAll(const char* path, const mp_part_li
return true;
}
//
// Common method that calls S3fsCurl::PreMultipartUploadRequest via pre_multipart_upload_request
//
// [NOTE]
// If the request is successful, initialize upload_id.
//
bool PseudoFdInfo::PreMultipartUploadRequest(const std::string& strpath, const headers_t& meta)
{
// get upload_id
std::string new_upload_id;
if(0 != pre_multipart_upload_request(strpath, meta, new_upload_id)){
return false;
}
// reset upload_id
if(!RowInitialUploadInfo(new_upload_id, false/* not need to cancel */)){
S3FS_PRN_ERR("failed to setup multipart upload(set upload id to object)");
return false;
}
S3FS_PRN_DBG("succeed to setup multipart upload(set upload id to object)");
return true;
}
//
// Upload the last updated Untreated area
//
@ -579,18 +624,9 @@ ssize_t PseudoFdInfo::UploadBoundaryLastUntreatedArea(const char* path, headers_
// Has multipart uploading already started?
//
if(!IsUploading()){
// Multipart uploading hasn't started yet, so start it.
//
S3fsCurl s3fscurl(true);
std::string tmp_upload_id;
int result;
if(0 != (result = s3fscurl.PreMultipartPostRequest(path, meta, tmp_upload_id))){
S3FS_PRN_ERR("failed to setup multipart upload(create upload id) by errno(%d)", result);
return result;
}
if(!RowInitialUploadInfo(tmp_upload_id, false/* not need to cancel */)){
S3FS_PRN_ERR("failed to setup multipart upload(set upload id to object)");
return result;
std::string strpath = SAFESTRPTR(path);
if(!PreMultipartUploadRequest(strpath, meta)){
return -EIO;
}
}

View File

@ -33,24 +33,6 @@
class UntreatedParts;
//------------------------------------------------
// Structure of parameters to pass to thread
//------------------------------------------------
class PseudoFdInfo;
struct pseudofdinfo_thparam
{
PseudoFdInfo* ppseudofdinfo = nullptr;
std::string path;
std::string upload_id;
int upload_fd = -1;
off_t start = 0;
off_t size = 0;
bool is_copy = false;
int part_num = -1;
etagpair* petag = nullptr;
};
//------------------------------------------------
// Class PseudoFdInfo
//------------------------------------------------
@ -83,6 +65,7 @@ class PseudoFdInfo
bool GetUploadInfo(std::string& id, int& fd) const;
bool ParallelMultipartUpload(const char* path, const mp_part_list_t& mplist, bool is_copy);
bool InsertUploadPart(off_t start, off_t size, int part_num, bool is_copy, etagpair** ppetag);
bool PreMultipartUploadRequest(const std::string& strpath, const headers_t& meta);
bool CancelAllThreads();
bool ExtractUploadPartsFromUntreatedArea(off_t untreated_start, off_t untreated_size, mp_part_list_t& to_upload_list, filepart_list_t& cancel_upload_list, off_t max_mp_size);
bool IsUploadingHasLock() const REQUIRES(upload_list_lock);

View File

@ -28,6 +28,7 @@
#include "s3fs_xml.h"
#include "s3fs_auth.h"
#include "string_util.h"
#include "s3fs_threadreqs.h"
//-------------------------------------------------------------------
// Global variables
@ -68,7 +69,6 @@ static bool abort_incomp_mpu_list(const incomp_mpu_list_t& list, time_t abort_ti
time_t now_time = time(nullptr);
// do removing.
S3fsCurl s3fscurl;
bool result = true;
for(auto iter = list.cbegin(); iter != list.cend(); ++iter){
const char* tpath = (*iter).key.c_str();
@ -85,15 +85,12 @@ static bool abort_incomp_mpu_list(const incomp_mpu_list_t& list, time_t abort_ti
}
}
if(0 != s3fscurl.AbortMultipartUpload(tpath, upload_id)){
if(0 != abort_multipart_upload_request(std::string(tpath), upload_id)){
S3FS_PRN_EXIT("Failed to remove %s multipart uploading object.", tpath);
result = false;
}else{
printf("Succeed to remove %s multipart uploading object.\n", tpath);
}
// reset(initialize) curl object
s3fscurl.DestroyCurlHandle();
}
return result;
}

View File

@ -54,6 +54,7 @@
#include "s3fs_cred.h"
#include "s3fs_help.h"
#include "s3fs_util.h"
#include "s3fs_threadreqs.h"
#include "mpu_util.h"
#include "threadpoolman.h"
@ -303,7 +304,7 @@ static bool is_special_name_folder_object(const char* path)
}
std::string strpath = path;
headers_t header;
headers_t header;
if(std::string::npos == strpath.find("_$folder$", 0)){
if('/' == *strpath.rbegin()){
@ -311,12 +312,15 @@ static bool is_special_name_folder_object(const char* path)
}
strpath += "_$folder$";
}
S3fsCurl s3fscurl;
if(0 != s3fscurl.HeadRequest(strpath.c_str(), header)){
// send request
if(0 != head_request(strpath, header)){
return false;
}
header.clear();
S3FS_MALLOCTRIM(0);
return true;
}
@ -414,9 +418,8 @@ static int chk_dir_object_type(const char* path, std::string& newpath, std::stri
static int remove_old_type_dir(const std::string& path, dirtype type)
{
if(IS_RMTYPEDIR(type)){
S3fsCurl s3fscurl;
int result = s3fscurl.DeleteRequest(path.c_str());
if(0 != result && -ENOENT != result){
int result;
if(0 != (result = delete_request(path))){
return result;
}
// succeed removing or not found the directory
@ -453,7 +456,6 @@ static int get_object_attribute(const char* path, struct stat* pstbuf, headers_t
headers_t tmpHead;
headers_t* pheader = pmeta ? pmeta : &tmpHead;
std::string strpath;
S3fsCurl s3fscurl;
bool forcedir = false;
bool is_mountpoint = false; // path is the mount point
bool is_bucket_mountpoint = false; // path is the mount point which is the bucket root
@ -519,8 +521,9 @@ static int get_object_attribute(const char* path, struct stat* pstbuf, headers_t
}else{
strpath = path;
}
result = s3fscurl.HeadRequest(strpath.c_str(), (*pheader));
s3fscurl.DestroyCurlHandle();
// get headers
result = head_request(strpath, *pheader);
// if not found target path object, do over checking
if(-EPERM == result){
@ -548,22 +551,26 @@ static int get_object_attribute(const char* path, struct stat* pstbuf, headers_t
if('/' != *strpath.rbegin() && std::string::npos == strpath.find("_$folder$", 0)){
// now path is "object", do check "object/" for over checking
strpath += "/";
result = s3fscurl.HeadRequest(strpath.c_str(), (*pheader));
s3fscurl.DestroyCurlHandle();
// re-get headers
result = head_request(strpath, *pheader);
}
if(support_compat_dir && 0 != result){
// now path is "object/", do check "object_$folder$" for over checking
strpath.erase(strpath.length() - 1);
strpath += "_$folder$";
result = s3fscurl.HeadRequest(strpath.c_str(), (*pheader));
s3fscurl.DestroyCurlHandle();
if(0 != result){
// cut "_$folder$" for over checking "no dir object" after here
if(std::string::npos != (Pos = strpath.find("_$folder$", 0))){
strpath.erase(Pos);
}
}
// re-get headers
result = head_request(strpath, *pheader);
// cppcheck-suppress unmatchedSuppression
// cppcheck-suppress knownConditionTrueFalse
if(0 != result){
// cut "_$folder$" for over checking "no dir object" after here
if(std::string::npos != (Pos = strpath.find("_$folder$", 0))){
strpath.erase(Pos);
}
}
}
}
if(0 != result && std::string::npos == strpath.find("_$folder$", 0)){
@ -907,7 +914,6 @@ static int get_local_fent(AutoFdEntity& autoent, FdEntity **entity, const char*
int put_headers(const char* path, headers_t& meta, bool is_copy, bool use_st_size)
{
int result;
S3fsCurl s3fscurl(true);
off_t size;
std::string strpath;
@ -932,11 +938,16 @@ int put_headers(const char* path, headers_t& meta, bool is_copy, bool use_st_siz
}
if(!nocopyapi && !nomultipart && size >= multipart_threshold){
// [TODO]
// This object will be removed after removing S3fsMultiCurl
//
S3fsCurl s3fscurl(true);
if(0 != (result = s3fscurl.MultipartHeadRequest(strpath.c_str(), size, meta))){
return result;
}
}else{
if(0 != (result = s3fscurl.PutHeadRequest(strpath.c_str(), meta, is_copy))){
// send put head request
if(0 != (result = put_head_request(strpath, meta, is_copy))){
return result;
}
}
@ -1060,8 +1071,11 @@ static int create_file_object(const char* path, mode_t mode, uid_t uid, gid_t gi
meta["x-amz-meta-ctime"] = strnow;
meta["x-amz-meta-mtime"] = strnow;
S3fsCurl s3fscurl(true);
return s3fscurl.PutRequest(path, meta, -1); // fd=-1 means for creating zero byte object.
int result;
if(0 != (result = put_request(std::string(SAFESTRPTR(path)), meta, -1, true/* ahbe */))){
return result;
}
return 0;
}
static int s3fs_mknod(const char *_path, mode_t mode, dev_t rdev)
@ -1188,8 +1202,11 @@ static int create_directory_object(const char* path, mode_t mode, const struct t
meta["x-amz-meta-xattr"] = pxattrvalue;
}
S3fsCurl s3fscurl;
return s3fscurl.PutRequest(tpath.c_str(), meta, -1); // fd=-1 means for creating zero byte object.
int result;
if(0 != (result = put_request(tpath, meta, -1, false/* ahbe */))){
return result;
}
return 0;
}
static int s3fs_mkdir(const char* _path, mode_t mode)
@ -1250,8 +1267,11 @@ static int s3fs_unlink(const char* _path)
if(0 != (result = check_parent_object_access(path, W_OK | X_OK))){
return result;
}
S3fsCurl s3fscurl;
result = s3fscurl.DeleteRequest(path);
if(0 != (result = delete_request(std::string(SAFESTRPTR(path))))){
return result;
}
StatCache::getStatCacheData()->DelStat(path);
StatCache::getStatCacheData()->DelSymlink(path);
FdManager::DeleteCacheFile(path);
@ -1304,9 +1324,10 @@ static int s3fs_rmdir(const char* _path)
if('/' != *strpath.rbegin()){
strpath += "/";
}
S3fsCurl s3fscurl;
result = s3fscurl.DeleteRequest(strpath.c_str());
s3fscurl.DestroyCurlHandle();
// delete request
result = delete_request(strpath);
StatCache::getStatCacheData()->DelStat(strpath);
// double check for old version(before 1.63)
@ -1320,8 +1341,9 @@ static int s3fs_rmdir(const char* _path)
if(0 == get_object_attribute(strpath.c_str(), &stbuf, nullptr, false)){
if(S_ISDIR(stbuf.st_mode)){
// Found "dir" object.
result = s3fscurl.DeleteRequest(strpath.c_str());
s3fscurl.DestroyCurlHandle();
// delete request
result = delete_request(strpath);
StatCache::getStatCacheData()->DelStat(strpath);
}
}
@ -1332,7 +1354,9 @@ static int s3fs_rmdir(const char* _path)
// This processing is necessary for other S3 clients compatibility.
if(is_special_name_folder_object(strpath.c_str())){
strpath += "_$folder$";
result = s3fscurl.DeleteRequest(strpath.c_str());
// delete request
result = delete_request(strpath);
}
// update parent directory timestamp
@ -1598,6 +1622,9 @@ static int rename_large_object(const char* from, const char* to)
return result;
}
// [TODO]
// This object will be removed after removing S3fsMultiCurl
//
S3fsCurl s3fscurl(true);
if(0 != (result = s3fscurl.MultipartRenameRequest(from, to, meta, buf.st_size))){
return result;
@ -3121,6 +3148,9 @@ static int s3fs_opendir(const char* _path, struct fuse_file_info* fi)
return result;
}
// [TODO]
// This function's argument(s3fscurl) will be checked and changed after removing S3fsMultiCurl
//
// cppcheck-suppress unmatchedSuppression
// cppcheck-suppress constParameterCallback
static bool multi_head_callback(S3fsCurl* s3fscurl, void* param)
@ -3163,6 +3193,9 @@ struct multi_head_notfound_callback_param
s3obj_list_t notfound_list;
};
// [TODO]
// This function's argument(s3fscurl) will be checked and changed after removing S3fsMultiCurl
//
static bool multi_head_notfound_callback(S3fsCurl* s3fscurl, void* param)
{
if(!s3fscurl){
@ -3184,6 +3217,9 @@ static bool multi_head_notfound_callback(S3fsCurl* s3fscurl, void* param)
return true;
}
// [TODO]
// This function's argument(s3fscurl) will be checked and changed after removing S3fsMultiCurl
//
static std::unique_ptr<S3fsCurl> multi_head_retry_callback(S3fsCurl* s3fscurl)
{
if(!s3fscurl){
@ -3220,6 +3256,9 @@ static std::unique_ptr<S3fsCurl> multi_head_retry_callback(S3fsCurl* s3fscurl)
static int readdir_multi_head(const char* path, const S3ObjList& head, void* buf, fuse_fill_dir_t filler)
{
// [TODO]
// This will be checked and changed after removing S3fsMultiCurl
//
S3fsMultiCurl curlmulti(S3fsCurl::GetMaxMultiRequest(), true); // [NOTE] run all requests to completion even if some requests fail.
s3obj_list_t headlist;
int result = 0;
@ -3393,7 +3432,6 @@ static int list_bucket(const char* path, S3ObjList& head, const char* delimiter,
std::string next_continuation_token;
std::string next_marker;
bool truncated = true;
S3fsCurl s3fscurl;
S3FS_PRN_INFO1("[path=%s]", path);
@ -3420,10 +3458,13 @@ static int list_bucket(const char* path, S3ObjList& head, const char* delimiter,
}
while(truncated){
// append parameters to query in alphabetical order
int result;
std::string each_query;
std::string responseBody;
// append parameters to query in alphabetical order
if(!next_continuation_token.empty()){
each_query += "continuation-token=" + urlEncodePath(next_continuation_token) + "&";
each_query += "continuation-token=" + urlEncodePath(next_continuation_token) + "&";
next_continuation_token = "";
}
each_query += query_delimiter;
@ -3437,20 +3478,17 @@ static int list_bucket(const char* path, S3ObjList& head, const char* delimiter,
each_query += query_maxkey;
each_query += query_prefix;
// request
int result;
if(0 != (result = s3fscurl.ListBucketRequest(path, each_query.c_str()))){
S3FS_PRN_ERR("ListBucketRequest returns with error.");
// send request
if(0 != (result = list_bucket_request(std::string(SAFESTRPTR(path)), each_query, responseBody))){
return result;
}
const std::string& body = s3fscurl.GetBodyData();
// [NOTE]
// CR code(\r) is replaced with LF(\n) by xmlReadMemory() function.
// To prevent that, only CR code is encoded by following function.
// The encoded CR code is decoded with append_objects_from_xml(_ex).
//
std::string encbody = get_encoded_cr_code(body.c_str());
std::string encbody = get_encoded_cr_code(responseBody.c_str());
// xmlDocPtr
std::unique_ptr<xmlDoc, decltype(&xmlFreeDoc)> doc(xmlReadMemory(encbody.c_str(), static_cast<int>(encbody.size()), "", nullptr, 0), xmlFreeDoc);
@ -3488,9 +3526,6 @@ static int list_bucket(const char* path, S3ObjList& head, const char* delimiter,
}
}
// reset(initialize) curl object
s3fscurl.DestroyCurlHandle();
if(check_content_only){
break;
}
@ -4211,6 +4246,11 @@ static void* s3fs_init(struct fuse_conn_info* conn)
S3FS_PRN_DBG("Could not initialize cache directory.");
}
if(!ThreadPoolMan::Initialize(max_thread_count)){
S3FS_PRN_CRIT("Could not create thread pool(%d)", max_thread_count);
s3fs_exit_fuseloop(EXIT_FAILURE);
}
// check loading IAM role name
if(!ps3fscred->LoadIAMRoleFromMetaData()){
S3FS_PRN_CRIT("could not load IAM role name from meta data.");
@ -4238,11 +4278,6 @@ static void* s3fs_init(struct fuse_conn_info* conn)
conn->want |= FUSE_CAP_BIG_WRITES;
}
if(!ThreadPoolMan::Initialize(max_thread_count)){
S3FS_PRN_CRIT("Could not create thread pool(%d)", max_thread_count);
s3fs_exit_fuseloop(EXIT_FAILURE);
}
// Signal object
if(!S3fsSignals::Initialize()){
S3FS_PRN_ERR("Failed to initialize signal object, but continue...");
@ -4385,124 +4420,118 @@ static int s3fs_check_service()
return EXIT_FAILURE;
}
S3fsCurl s3fscurl;
bool force_no_sse = false;
bool forceNoSSE = false; // it is not mandatory at first.
while(0 > s3fscurl.CheckBucket(get_realpath("/").c_str(), support_compat_dir, force_no_sse)){
// get response code
bool do_retry = false;
long responseCode = s3fscurl.GetLastResponseCode();
for(bool isLoop = true; isLoop; ){
long responseCode = S3fsCurl::S3FSCURL_RESPONSECODE_NOTSET;
std::string responseBody;
if(0 > check_service_request(get_realpath("/"), forceNoSSE, support_compat_dir, responseCode, responseBody)){
// check wrong endpoint, and automatically switch endpoint
if(300 <= responseCode && responseCode < 500){
// check region error(for putting message or retrying)
std::string expectregion;
std::string expectendpoint;
// check wrong endpoint, and automatically switch endpoint
if(300 <= responseCode && responseCode < 500){
// Check if any case can be retried
if(check_region_error(responseBody.c_str(), responseBody.size(), expectregion)){
// [NOTE]
// If endpoint is not specified(using us-east-1 region) and
// an error is encountered accessing a different region, we
// will retry the check on the expected region.
// see) https://docs.aws.amazon.com/AmazonS3/latest/dev/UsingBucket.html#access-bucket-intro
//
if(s3host != "http://s3.amazonaws.com" && s3host != "https://s3.amazonaws.com"){
// specified endpoint for specified url is wrong.
if(is_specified_endpoint){
S3FS_PRN_CRIT("The bucket region is not '%s'(specified) for specified url(%s), it is correctly '%s'. You should specify url(http(s)://s3-%s.amazonaws.com) and endpoint(%s) option.", endpoint.c_str(), s3host.c_str(), expectregion.c_str(), expectregion.c_str(), expectregion.c_str());
}else{
S3FS_PRN_CRIT("The bucket region is not '%s'(default) for specified url(%s), it is correctly '%s'. You should specify url(http(s)://s3-%s.amazonaws.com) and endpoint(%s) option.", endpoint.c_str(), s3host.c_str(), expectregion.c_str(), expectregion.c_str(), expectregion.c_str());
}
isLoop = false;
// check region error(for putting message or retrying)
const std::string& body = s3fscurl.GetBodyData();
std::string expectregion;
std::string expectendpoint;
}else if(is_specified_endpoint){
// specified endpoint is wrong.
S3FS_PRN_CRIT("The bucket region is not '%s'(specified), it is correctly '%s'. You should specify endpoint(%s) option.", endpoint.c_str(), expectregion.c_str(), expectregion.c_str());
isLoop = false;
}else if(S3fsCurl::GetSignatureType() == signature_type_t::V4_ONLY || S3fsCurl::GetSignatureType() == signature_type_t::V2_OR_V4){
// current endpoint and url are default value, so try to connect to expected region.
S3FS_PRN_CRIT("Failed to connect region '%s'(default), so retry to connect region '%s' for url(http(s)://s3-%s.amazonaws.com).", endpoint.c_str(), expectregion.c_str(), expectregion.c_str());
// change endpoint
endpoint = expectregion;
// change url
if(s3host == "http://s3.amazonaws.com"){
s3host = "http://s3-" + endpoint + ".amazonaws.com";
}else if(s3host == "https://s3.amazonaws.com"){
s3host = "https://s3-" + endpoint + ".amazonaws.com";
}
// Check if any case can be retried
if(check_region_error(body.c_str(), body.size(), expectregion)){
// [NOTE]
// If endpoint is not specified(using us-east-1 region) and
// an error is encountered accessing a different region, we
// will retry the check on the expected region.
// see) https://docs.aws.amazon.com/AmazonS3/latest/dev/UsingBucket.html#access-bucket-intro
//
if(s3host != "http://s3.amazonaws.com" && s3host != "https://s3.amazonaws.com"){
// specified endpoint for specified url is wrong.
if(is_specified_endpoint){
S3FS_PRN_CRIT("The bucket region is not '%s'(specified) for specified url(%s), it is correctly '%s'. You should specify url(http(s)://s3-%s.amazonaws.com) and endpoint(%s) option.", endpoint.c_str(), s3host.c_str(), expectregion.c_str(), expectregion.c_str(), expectregion.c_str());
}else{
S3FS_PRN_CRIT("The bucket region is not '%s'(default) for specified url(%s), it is correctly '%s'. You should specify url(http(s)://s3-%s.amazonaws.com) and endpoint(%s) option.", endpoint.c_str(), s3host.c_str(), expectregion.c_str(), expectregion.c_str(), expectregion.c_str());
S3FS_PRN_CRIT("The bucket region is not '%s'(default), it is correctly '%s'. You should specify endpoint(%s) option.", endpoint.c_str(), expectregion.c_str(), expectregion.c_str());
isLoop = false;
}
}else if(is_specified_endpoint){
// specified endpoint is wrong.
S3FS_PRN_CRIT("The bucket region is not '%s'(specified), it is correctly '%s'. You should specify endpoint(%s) option.", endpoint.c_str(), expectregion.c_str(), expectregion.c_str());
}else if(S3fsCurl::GetSignatureType() == signature_type_t::V4_ONLY || S3fsCurl::GetSignatureType() == signature_type_t::V2_OR_V4){
// current endpoint and url are default value, so try to connect to expected region.
S3FS_PRN_CRIT("Failed to connect region '%s'(default), so retry to connect region '%s' for url(http(s)://s3-%s.amazonaws.com).", endpoint.c_str(), expectregion.c_str(), expectregion.c_str());
// change endpoint
endpoint = expectregion;
// change url
if(s3host == "http://s3.amazonaws.com"){
s3host = "http://s3-" + endpoint + ".amazonaws.com";
}else if(s3host == "https://s3.amazonaws.com"){
s3host = "https://s3-" + endpoint + ".amazonaws.com";
}else if(check_endpoint_error(responseBody.c_str(), responseBody.size(), expectendpoint)){
// redirect error
if(pathrequeststyle){
S3FS_PRN_CRIT("S3 service returned PermanentRedirect (current is url(%s) and endpoint(%s)). You need to specify correct url(http(s)://s3-<endpoint>.amazonaws.com) and endpoint option with use_path_request_style option.", s3host.c_str(), endpoint.c_str());
}else{
S3FS_PRN_CRIT("S3 service returned PermanentRedirect with %s (current is url(%s) and endpoint(%s)). You need to specify correct endpoint option.", expectendpoint.c_str(), s3host.c_str(), endpoint.c_str());
}
return EXIT_FAILURE;
// Retry with changed host
s3fscurl.DestroyCurlHandle();
do_retry = true;
}else if(check_invalid_sse_arg_error(responseBody.c_str(), responseBody.size())){
// SSE argument error, so retry it without SSE
S3FS_PRN_CRIT("S3 service returned InvalidArgument(x-amz-server-side-encryption), so retry without adding x-amz-server-side-encryption.");
}else{
S3FS_PRN_CRIT("The bucket region is not '%s'(default), it is correctly '%s'. You should specify endpoint(%s) option.", endpoint.c_str(), expectregion.c_str(), expectregion.c_str());
// Retry without sse parameters
forceNoSSE = true;
}
}
}else if(check_endpoint_error(body.c_str(), body.size(), expectendpoint)){
// redirect error
if(pathrequeststyle){
S3FS_PRN_CRIT("S3 service returned PermanentRedirect (current is url(%s) and endpoint(%s)). You need to specify correct url(http(s)://s3-<endpoint>.amazonaws.com) and endpoint option with use_path_request_style option.", s3host.c_str(), endpoint.c_str());
// Try changing signature from v4 to v2
//
// [NOTE]
// If there is no case to retry with the previous checks, and there
// is a chance to retry with signature v2, prepare to retry with v2.
//
if(!isLoop && (responseCode == 400 || responseCode == 403) && S3fsCurl::GetSignatureType() == signature_type_t::V2_OR_V4){
// switch sigv2
S3FS_PRN_CRIT("Failed to connect by sigv4, so retry to connect by signature version 2. But you should to review url and endpoint option.");
// retry to check with sigv2
isLoop = true;
S3fsCurl::SetSignatureType(signature_type_t::V2_ONLY);
}
// check errors(after retrying)
if(!isLoop && responseCode != 200 && responseCode != 301){
// parse error message if existed
std::string errMessage;
check_error_message(responseBody.c_str(), responseBody.size(), errMessage);
if(responseCode == 400){
S3FS_PRN_CRIT("Failed to check bucket and directory for mount point : Bad Request(host=%s, message=%s)", s3host.c_str(), errMessage.c_str());
}else if(responseCode == 403){
S3FS_PRN_CRIT("Failed to check bucket and directory for mount point : Invalid Credentials(host=%s, message=%s)", s3host.c_str(), errMessage.c_str());
}else if(responseCode == 404){
if(mount_prefix.empty()){
S3FS_PRN_CRIT("Failed to check bucket and directory for mount point : Bucket or directory not found(host=%s, message=%s)", s3host.c_str(), errMessage.c_str());
}else{
S3FS_PRN_CRIT("Failed to check bucket and directory for mount point : Bucket or directory(%s) not found(host=%s, message=%s) - You may need to specify the compat_dir option.", mount_prefix.c_str(), s3host.c_str(), errMessage.c_str());
}
}else{
S3FS_PRN_CRIT("S3 service returned PermanentRedirect with %s (current is url(%s) and endpoint(%s)). You need to specify correct endpoint option.", expectendpoint.c_str(), s3host.c_str(), endpoint.c_str());
S3FS_PRN_CRIT("Failed to check bucket and directory for mount point : Unable to connect(host=%s, message=%s)", s3host.c_str(), errMessage.c_str());
}
return EXIT_FAILURE;
}else if(check_invalid_sse_arg_error(body.c_str(), body.size())){
// SSE argument error, so retry it without SSE
S3FS_PRN_CRIT("S3 service returned InvalidArgument(x-amz-server-side-encryption), so retry without adding x-amz-server-side-encryption.");
// Retry without sse parameters
s3fscurl.DestroyCurlHandle();
do_retry = true;
force_no_sse = true;
}
}
// Try changing signature from v4 to v2
//
// [NOTE]
// If there is no case to retry with the previous checks, and there
// is a chance to retry with signature v2, prepare to retry with v2.
//
if(!do_retry && (responseCode == 400 || responseCode == 403) && S3fsCurl::GetSignatureType() == signature_type_t::V2_OR_V4){
// switch sigv2
S3FS_PRN_CRIT("Failed to connect by sigv4, so retry to connect by signature version 2. But you should to review url and endpoint option.");
// retry to check with sigv2
s3fscurl.DestroyCurlHandle();
do_retry = true;
S3fsCurl::SetSignatureType(signature_type_t::V2_ONLY);
}
// check errors(after retrying)
if(!do_retry && responseCode != 200 && responseCode != 301){
// parse error message if existed
std::string errMessage;
const std::string& body = s3fscurl.GetBodyData();
check_error_message(body.c_str(), body.size(), errMessage);
if(responseCode == 400){
S3FS_PRN_CRIT("Failed to check bucket and directory for mount point : Bad Request(host=%s, message=%s)", s3host.c_str(), errMessage.c_str());
}else if(responseCode == 403){
S3FS_PRN_CRIT("Failed to check bucket and directory for mount point : Invalid Credentials(host=%s, message=%s)", s3host.c_str(), errMessage.c_str());
}else if(responseCode == 404){
if(mount_prefix.empty()){
S3FS_PRN_CRIT("Failed to check bucket and directory for mount point : Bucket or directory not found(host=%s, message=%s)", s3host.c_str(), errMessage.c_str());
}else{
S3FS_PRN_CRIT("Failed to check bucket and directory for mount point : Bucket or directory(%s) not found(host=%s, message=%s) - You may need to specify the compat_dir option.", mount_prefix.c_str(), s3host.c_str(), errMessage.c_str());
}
}else{
S3FS_PRN_CRIT("Failed to check bucket and directory for mount point : Unable to connect(host=%s, message=%s)", s3host.c_str(), errMessage.c_str());
}
return EXIT_FAILURE;
}else{
// break loop
isLoop = false;
}
}
s3fscurl.DestroyCurlHandle();
// make sure remote mountpath exists and is a directory
if(!mount_prefix.empty()){

View File

@ -35,6 +35,8 @@
#include "curl.h"
#include "string_util.h"
#include "metaheader.h"
#include "threadpoolman.h"
#include "s3fs_threadreqs.h"
//-------------------------------------------------------------------
// Symbols
@ -290,7 +292,7 @@ bool S3fsCred::SetIAMRole(const char* role)
return true;
}
const std::string& S3fsCred::GetIAMRole() const
const std::string& S3fsCred::GetIAMRoleHasLock() const
{
return IAM_role;
}
@ -336,7 +338,7 @@ bool S3fsCred::GetIAMCredentialsURL(std::string& url, bool check_iam_role)
S3FS_PRN_ERR("IAM role name is empty.");
return false;
}
S3FS_PRN_INFO3("[IAM role=%s]", GetIAMRole().c_str());
S3FS_PRN_INFO3("[IAM role=%s]", GetIAMRoleHasLock().c_str());
}
if(is_ecs){
@ -356,15 +358,15 @@ bool S3fsCred::GetIAMCredentialsURL(std::string& url, bool check_iam_role)
// in the S3fsCurl::GetIAMv2ApiToken method (when retrying).
//
if(GetIMDSVersion() > 1){
S3fsCurl s3fscurl;
std::string token;
int result = s3fscurl.GetIAMv2ApiToken(S3fsCred::IAMv2_token_url, S3fsCred::IAMv2_token_ttl, S3fsCred::IAMv2_token_ttl_hdr, token);
int result = get_iamv2api_token_request(std::string(S3fsCred::IAMv2_token_url), S3fsCred::IAMv2_token_ttl, std::string(S3fsCred::IAMv2_token_ttl_hdr), token);
if(-ENOENT == result){
// If we get a 404 back when requesting the token service,
// then it's highly likely we're running in an environment
// that doesn't support the AWS IMDSv2 API, so we'll skip
// the token retrieval in the future.
SetIMDSVersion(1);
SetIMDSVersionHasLock(1);
}else if(result != 0){
// If we get an unexpected error when retrieving the API
@ -375,13 +377,13 @@ bool S3fsCred::GetIAMCredentialsURL(std::string& url, bool check_iam_role)
}else{
// Set token
if(!SetIAMv2APIToken(token)){
if(!SetIAMv2APITokenHasLock(token)){
S3FS_PRN_ERR("Error storing IMDSv2 API token(%s).", token.c_str());
}
}
}
if(check_iam_role){
url = IAM_cred_url + GetIAMRole();
url = IAM_cred_url + GetIAMRoleHasLock();
}else{
url = IAM_cred_url;
}
@ -389,7 +391,7 @@ bool S3fsCred::GetIAMCredentialsURL(std::string& url, bool check_iam_role)
return true;
}
int S3fsCred::SetIMDSVersion(int version)
int S3fsCred::SetIMDSVersionHasLock(int version)
{
int old = IAM_api_version;
IAM_api_version = version;
@ -401,7 +403,7 @@ int S3fsCred::GetIMDSVersion() const
return IAM_api_version;
}
bool S3fsCred::SetIAMv2APIToken(const std::string& token)
bool S3fsCred::SetIAMv2APITokenHasLock(const std::string& token)
{
S3FS_PRN_INFO3("Setting AWS IMDSv2 API token to %s", token.c_str());
@ -427,35 +429,27 @@ const std::string& S3fsCred::GetIAMv2APIToken() const
//
bool S3fsCred::LoadIAMCredentials()
{
// url(check iam role)
std::string url;
std::string striamtoken;
std::string stribmsecret;
std::string cred;
// get parameters(check iam role)
if(!GetIAMCredentialsURL(url, true)){
return false;
}
const char* iam_v2_token = nullptr;
std::string str_iam_v2_token;
if(GetIMDSVersion() > 1){
str_iam_v2_token = GetIAMv2APIToken();
iam_v2_token = str_iam_v2_token.c_str();
striamtoken = GetIAMv2APIToken();
}
const char* ibm_secret_access_key = nullptr;
std::string str_ibm_secret_access_key;
if(IsIBMIAMAuth()){
str_ibm_secret_access_key = AWSSecretAccessKey;
ibm_secret_access_key = str_ibm_secret_access_key.c_str();
stribmsecret = AWSSecretAccessKey;
}
S3fsCurl s3fscurl;
std::string response;
if(!s3fscurl.GetIAMCredentials(url.c_str(), iam_v2_token, ibm_secret_access_key, response)){
return false;
}
if(!SetIAMCredentials(response.c_str())){
S3FS_PRN_ERR("Something error occurred, could not set IAM role name.");
// Get IAM Credentials
if(0 == get_iamcred_request(url, striamtoken, stribmsecret, cred)){
S3FS_PRN_DBG("Succeed to set IAM credentials");
}else{
S3FS_PRN_ERR("Something error occurred, could not set IAM credentials.");
return false;
}
return true;
@ -466,40 +460,45 @@ bool S3fsCred::LoadIAMCredentials()
//
bool S3fsCred::LoadIAMRoleFromMetaData()
{
const std::lock_guard<std::mutex> lock(token_lock);
if(!load_iamrole){
// nothing to do
return true;
}
std::string url;
std::string iamtoken;
{
const std::lock_guard<std::mutex> lock(token_lock);
if(load_iamrole){
// url(not check iam role)
std::string url;
if(!GetIAMCredentialsURL(url, false)){
return false;
}
const char* iam_v2_token = nullptr;
std::string str_iam_v2_token;
if(GetIMDSVersion() > 1){
str_iam_v2_token = GetIAMv2APIToken();
iam_v2_token = str_iam_v2_token.c_str();
iamtoken = GetIAMv2APIToken();
}
}
S3fsCurl s3fscurl;
std::string token;
if(!s3fscurl.GetIAMRoleFromMetaData(url.c_str(), iam_v2_token, token)){
return false;
}
// Get IAM Role token
std::string token;
if(0 != get_iamrole_request(url, iamtoken, token)){
S3FS_PRN_ERR("failed to get IAM Role token from meta data.");
return false;
}
if(!SetIAMRoleFromMetaData(token.c_str())){
S3FS_PRN_ERR("Something error occurred, could not set IAM role name.");
return false;
}
S3FS_PRN_INFO("loaded IAM role name = %s", GetIAMRole().c_str());
// Set
if(!SetIAMRoleFromMetaData(token.c_str())){
S3FS_PRN_ERR("Something error occurred, could not set IAM role name.");
return false;
}
return true;
}
bool S3fsCred::SetIAMCredentials(const char* response)
{
const std::lock_guard<std::mutex> lock(token_lock);
S3FS_PRN_INFO3("IAM credential response = \"%s\"", response);
iamcredmap_t keyval;
@ -530,6 +529,8 @@ bool S3fsCred::SetIAMCredentials(const char* response)
bool S3fsCred::SetIAMRoleFromMetaData(const char* response)
{
const std::lock_guard<std::mutex> lock(token_lock);
S3FS_PRN_INFO3("IAM role name response = \"%s\"", response ? response : "(null)");
std::string rolename;
@ -1372,7 +1373,7 @@ int S3fsCred::DetectParam(const char* arg)
SetIAMTokenField("\"access_token\"");
SetIAMExpiryField("\"expiration\"");
SetIAMFieldCount(2);
SetIMDSVersion(1);
SetIMDSVersionHasLock(1);
set_builtin_cred_opts = true;
return 0;
}
@ -1399,7 +1400,7 @@ int S3fsCred::DetectParam(const char* arg)
}
if(0 == strcmp(arg, "imdsv1only")){
SetIMDSVersion(1);
SetIMDSVersionHasLock(1);
set_builtin_cred_opts = true;
return 0;
}
@ -1410,7 +1411,7 @@ int S3fsCred::DetectParam(const char* arg)
return -1;
}
SetIsECS(true);
SetIMDSVersion(1);
SetIMDSVersionHasLock(1);
SetIAMCredentialsURL("http://169.254.170.2");
SetIAMFieldCount(5);
set_builtin_cred_opts = true;

View File

@ -115,14 +115,24 @@ class S3fsCred
bool SetIsIBMIAMAuth(bool flag);
int SetIMDSVersion(int version) REQUIRES(S3fsCred::token_lock);
int SetIMDSVersionHasLock(int version) REQUIRES(S3fsCred::token_lock);
int SetIMDSVersion(int version)
{
const std::lock_guard<std::mutex> lock(token_lock);
return SetIMDSVersionHasLock(version);
}
int GetIMDSVersion() const REQUIRES(S3fsCred::token_lock);
bool SetIAMv2APIToken(const std::string& token) REQUIRES(S3fsCred::token_lock);
bool SetIAMv2APITokenHasLock(const std::string& token) REQUIRES(S3fsCred::token_lock);
const std::string& GetIAMv2APIToken() const REQUIRES(S3fsCred::token_lock);
bool SetIAMRole(const char* role) REQUIRES(S3fsCred::token_lock);
const std::string& GetIAMRole() const REQUIRES(S3fsCred::token_lock);
const std::string& GetIAMRoleHasLock() const REQUIRES(S3fsCred::token_lock);
const std::string& GetIAMRole() const
{
const std::lock_guard<std::mutex> lock(token_lock);
return GetIAMRoleHasLock();
}
bool IsSetIAMRole() const REQUIRES(S3fsCred::token_lock);
size_t SetIAMFieldCount(size_t field_count);
std::string SetIAMCredentialsURL(const char* url);
@ -142,8 +152,8 @@ class S3fsCred
bool GetIAMCredentialsURL(std::string& url, bool check_iam_role) REQUIRES(S3fsCred::token_lock);
bool LoadIAMCredentials() REQUIRES(S3fsCred::token_lock);
bool SetIAMCredentials(const char* response) REQUIRES(S3fsCred::token_lock);
bool SetIAMRoleFromMetaData(const char* response) REQUIRES(S3fsCred::token_lock);
bool SetIAMCredentials(const char* response);
bool SetIAMRoleFromMetaData(const char* response);
bool SetExtCredLib(const char* arg);
bool IsSetExtCredLib() const;

View File

@ -244,7 +244,7 @@ static constexpr char help_string[] =
"\n"
" parallel_count (default=\"5\")\n"
" - number of parallel request for uploading big objects.\n"
" s3fs uploads large object (over 20MB) by multipart post request, \n"
" s3fs uploads large object (over 20MB) by multipart upload request, \n"
" and sends parallel requests.\n"
" This option limits parallel request count which s3fs requests \n"
" at once. It is necessary to set this value depending on a CPU \n"

591
src/s3fs_threadreqs.cpp Normal file
View File

@ -0,0 +1,591 @@
/*
* s3fs - FUSE-based file system backed by Amazon S3
*
* Copyright(C) 2007 Randy Rizun <rrizun@gmail.com>
*
* This program is free software; you can redistribute it and/or
* modify it under the terms of the GNU General Public License
* as published by the Free Software Foundation; either version 2
* of the License, or (at your option) any later version.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU General Public License for more details.
*
* You should have received a copy of the GNU General Public License
* along with this program; if not, write to the Free Software
* Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA.
*/
#include "common.h"
#include "s3fs_threadreqs.h"
#include "threadpoolman.h"
#include "curl_util.h"
#include "s3fs_logger.h"
//-------------------------------------------------------------------
// Thread Worker functions for MultiThread Request
//-------------------------------------------------------------------
//
// Thread Worker function for head request
//
void* head_req_threadworker(void* arg)
{
auto* pthparam = static_cast<head_req_thparam*>(arg);
if(!pthparam || !pthparam->pmeta){
return reinterpret_cast<void*>(-EIO);
}
S3FS_PRN_INFO3("Head Request [path=%s][pmeta=%p]", pthparam->path.c_str(), pthparam->pmeta);
S3fsCurl s3fscurl;
pthparam->result = s3fscurl.HeadRequest(pthparam->path.c_str(), *(pthparam->pmeta));
return reinterpret_cast<void*>(pthparam->result);
}
//
// Thread Worker function for delete request
//
void* delete_req_threadworker(void* arg)
{
auto* pthparam = static_cast<delete_req_thparam*>(arg);
if(!pthparam){
return reinterpret_cast<void*>(-EIO);
}
S3FS_PRN_INFO3("Delete Request [path=%s]", pthparam->path.c_str());
S3fsCurl s3fscurl;
pthparam->result = s3fscurl.DeleteRequest(pthparam->path.c_str());
return reinterpret_cast<void*>(pthparam->result);
}
//
// Thread Worker function for put head request
//
void* put_head_req_threadworker(void* arg)
{
auto* pthparam = static_cast<put_head_req_thparam*>(arg);
if(!pthparam){
return reinterpret_cast<void*>(-EIO);
}
S3FS_PRN_INFO3("Put Head Request [path=%s][meta count=%lu][is copy=%s]", pthparam->path.c_str(), pthparam->meta.size(), (pthparam->isCopy ? "true" : "false"));
S3fsCurl s3fscurl(true);
pthparam->result = s3fscurl.PutHeadRequest(pthparam->path.c_str(), pthparam->meta, pthparam->isCopy);
return reinterpret_cast<void*>(pthparam->result);
}
//
// Thread Worker function for put request
//
void* put_req_threadworker(void* arg)
{
auto* pthparam = static_cast<put_req_thparam*>(arg);
if(!pthparam){
return reinterpret_cast<void*>(-EIO);
}
S3FS_PRN_INFO3("Put Request [path=%s][meta count=%lu][fd=%d][use_ahbe=%s]", pthparam->path.c_str(), pthparam->meta.size(), pthparam->fd, (pthparam->ahbe ? "true" : "false"));
S3fsCurl s3fscurl(pthparam->ahbe);
pthparam->result = s3fscurl.PutRequest(pthparam->path.c_str(), pthparam->meta, pthparam->fd);
return reinterpret_cast<void*>(pthparam->result);
}
//
// Thread Worker function for list bucket request
//
void* list_bucket_req_threadworker(void* arg)
{
auto* pthparam = static_cast<list_bucket_req_thparam*>(arg);
if(!pthparam || !(pthparam->presponseBody)){
return reinterpret_cast<void*>(-EIO);
}
S3FS_PRN_INFO3("List Bucket Request [path=%s][query=%s]", pthparam->path.c_str(), pthparam->query.c_str());
S3fsCurl s3fscurl;
if(0 == (pthparam->result = s3fscurl.ListBucketRequest(pthparam->path.c_str(), pthparam->query.c_str()))){
*(pthparam->presponseBody) = s3fscurl.GetBodyData();
}
return reinterpret_cast<void*>(pthparam->result);
}
//
// Thread Worker function for check service request
//
void* check_service_req_threadworker(void* arg)
{
auto* pthparam = static_cast<check_service_req_thparam*>(arg);
if(!pthparam || !(pthparam->presponseCode) || !(pthparam->presponseBody)){
return reinterpret_cast<void*>(-EIO);
}
S3FS_PRN_INFO3("Check Service Request [path=%s][support compat dir=%s][force No SSE=%s]", pthparam->path.c_str(), (pthparam->support_compat_dir ? "true" : "false"), (pthparam->forceNoSSE ? "true" : "false"));
S3fsCurl s3fscurl;
if(0 == (pthparam->result = s3fscurl.CheckBucket(pthparam->path.c_str(), pthparam->support_compat_dir, pthparam->forceNoSSE))){
*(pthparam->presponseCode) = s3fscurl.GetLastResponseCode();
*(pthparam->presponseBody) = s3fscurl.GetBodyData();
}
return reinterpret_cast<void*>(pthparam->result);
}
//
// Worker function for pre multipart upload request
//
void* pre_multipart_upload_req_threadworker(void* arg)
{
auto* pthparam = static_cast<pre_multipart_upload_req_thparam*>(arg);
if(!pthparam){
return reinterpret_cast<void*>(-EIO);
}
S3FS_PRN_INFO3("Pre Multipart Upload Request [path=%s][meta count=%lu]", pthparam->path.c_str(), pthparam->meta.size());
S3fsCurl s3fscurl(true);
pthparam->result = s3fscurl.PreMultipartUploadRequest(pthparam->path.c_str(), pthparam->meta, pthparam->upload_id);
return reinterpret_cast<void*>(pthparam->result);
}
//
// Worker function for complete multipart upload request
//
void* complete_multipart_upload_threadworker(void* arg)
{
auto* pthparam = static_cast<complete_multipart_upload_req_thparam*>(arg);
if(!pthparam){
return reinterpret_cast<void*>(-EIO);
}
S3FS_PRN_INFO3("Complete Multipart Upload Request [path=%s][upload id=%s][etaglist=%lu]", pthparam->path.c_str(), pthparam->upload_id.c_str(), pthparam->etaglist.size());
S3fsCurl s3fscurl(true);
pthparam->result = s3fscurl.MultipartUploadComplete(pthparam->path.c_str(), pthparam->upload_id, pthparam->etaglist);
return reinterpret_cast<void*>(pthparam->result);
}
//
// Worker function for abort multipart upload request
//
void* abort_multipart_upload_req_threadworker(void* arg)
{
auto* pthparam = static_cast<abort_multipart_upload_req_thparam*>(arg);
if(!pthparam){
return reinterpret_cast<void*>(-EIO);
}
S3FS_PRN_INFO3("Abort Multipart Upload Request [path=%s][upload id=%s]", pthparam->path.c_str(), pthparam->upload_id.c_str());
S3fsCurl s3fscurl(true);
pthparam->result = s3fscurl.AbortMultipartUpload(pthparam->path.c_str(), pthparam->upload_id);
return reinterpret_cast<void*>(pthparam->result);
}
//
// Thread Worker function for get object request
//
void* get_object_req_threadworker(void* arg)
{
auto* pthparam = static_cast<get_object_req_thparam*>(arg);
if(!pthparam){
return reinterpret_cast<void*>(-EIO);
}
S3FS_PRN_INFO3("Get Object Request [path=%s][fd=%d][start=%lld][size=%lld]", pthparam->path.c_str(), pthparam->fd, static_cast<long long>(pthparam->start), static_cast<long long>(pthparam->size));
sse_type_t ssetype = sse_type_t::SSE_DISABLE;
std::string ssevalue;
if(!get_object_sse_type(pthparam->path.c_str(), ssetype, ssevalue)){
S3FS_PRN_WARN("Failed to get SSE type for file(%s).", pthparam->path.c_str());
}
S3fsCurl s3fscurl;
pthparam->result = s3fscurl.GetObjectRequest(pthparam->path.c_str(), pthparam->fd, pthparam->start, pthparam->size, ssetype, ssevalue);
return reinterpret_cast<void*>(pthparam->result);
}
//-------------------------------------------------------------------
// Utility functions
//-------------------------------------------------------------------
//
// Calls S3fsCurl::HeadRequest via head_req_threadworker
//
int head_request(const std::string& strpath, headers_t& header)
{
// parameter for thread worker
head_req_thparam thargs;
thargs.path = strpath;
thargs.pmeta = &header;
thargs.result = 0;
// make parameter for thread pool
thpoolman_param ppoolparam;
ppoolparam.args = &thargs;
ppoolparam.psem = nullptr; // case await
ppoolparam.pfunc = head_req_threadworker;
// send request by thread
if(!ThreadPoolMan::AwaitInstruct(ppoolparam)){
S3FS_PRN_ERR("failed to setup Await Head Request Thread Worker [path=%s]", strpath.c_str());
return -EIO;
}
if(0 != thargs.result){
S3FS_PRN_DBG("Await Head Request by error(%d) [path=%s]", thargs.result, strpath.c_str());
return thargs.result;
}
return 0;
}
//
// Calls S3fsCurl::DeleteRequest via delete_req_threadworker
//
int delete_request(const std::string& strpath)
{
// parameter for thread worker
delete_req_thparam thargs;
thargs.path = strpath;
thargs.result = 0;
// make parameter for thread pool
thpoolman_param ppoolparam;
ppoolparam.args = &thargs;
ppoolparam.psem = nullptr; // case await
ppoolparam.pfunc = delete_req_threadworker;
// send request by thread
if(!ThreadPoolMan::AwaitInstruct(ppoolparam)){
S3FS_PRN_ERR("failed to setup Await Delete Request Thread Worker [path=%s]", strpath.c_str());
return -EIO;
}
if(0 != thargs.result){
S3FS_PRN_DBG("Await Delete Request by error(%d) [path=%s]", thargs.result, strpath.c_str());
return thargs.result;
}
return 0;
}
//
// Calls S3fsCurl::PutHeadRequest via put_head_req_threadworker
//
int put_head_request(const std::string& strpath, const headers_t& meta, bool is_copy)
{
// parameter for thread worker
put_head_req_thparam thargs;
thargs.path = strpath;
thargs.meta = meta; // copy
thargs.isCopy = is_copy;
thargs.result = 0;
// make parameter for thread pool
thpoolman_param ppoolparam;
ppoolparam.args = &thargs;
ppoolparam.psem = nullptr; // case await
ppoolparam.pfunc = put_head_req_threadworker;
// send request by thread
if(!ThreadPoolMan::AwaitInstruct(ppoolparam)){
S3FS_PRN_ERR("failed to setup Await Put Head Request Thread Worker [path=%s][meta count=%lu][is copy=%s]", strpath.c_str(), meta.size(), (is_copy ? "true" : "false"));
return -EIO;
}
if(0 != thargs.result){
S3FS_PRN_ERR("Await Put Head Request by error(%d) [path=%s][meta count=%lu][is copy=%s]", thargs.result, strpath.c_str(), meta.size(), (is_copy ? "true" : "false"));
return thargs.result;
}
return 0;
}
//
// Calls S3fsCurl::PutRequest via put_req_threadworker
//
int put_request(const std::string& strpath, const headers_t& meta, int fd, bool ahbe)
{
// parameter for thread worker
put_req_thparam thargs;
thargs.path = strpath;
thargs.meta = meta; // copy
thargs.fd = fd; // fd=-1 means for creating zero byte object.
thargs.ahbe = ahbe;
thargs.result = 0;
// make parameter for thread pool
thpoolman_param ppoolparam;
ppoolparam.args = &thargs;
ppoolparam.psem = nullptr; // case await
ppoolparam.pfunc = put_req_threadworker;
// send request by thread
if(!ThreadPoolMan::AwaitInstruct(ppoolparam)){
S3FS_PRN_ERR("failed to setup Await Put Request Thread Worker [path=%s][meta count=%lu][fd=%d][use_ahbe=%s]", strpath.c_str(), meta.size(), fd, (ahbe ? "true" : "false"));
return -EIO;
}
if(0 != thargs.result){
S3FS_PRN_ERR("Await Put Request by error(%d) [path=%s][meta count=%lu][fd=%d][use_ahbe=%s]", thargs.result, strpath.c_str(), meta.size(), fd, (ahbe ? "true" : "false"));
return thargs.result;
}
return 0;
}
//
// Calls S3fsCurl::ListBucketRequest via list_bucket_req_threadworker
//
int list_bucket_request(const std::string& strpath, const std::string& query, std::string& responseBody)
{
// parameter for thread worker
list_bucket_req_thparam thargs;
thargs.path = strpath;
thargs.query = query;
thargs.presponseBody = &responseBody;
thargs.result = 0;
// make parameter for thread pool
thpoolman_param ppoolparam;
ppoolparam.args = &thargs;
ppoolparam.psem = nullptr; // case await
ppoolparam.pfunc = list_bucket_req_threadworker;
// send request by thread
if(!ThreadPoolMan::AwaitInstruct(ppoolparam)){
S3FS_PRN_ERR("failed to setup Await List Bucket Request Thread Worker [path=%s][query=%s]", strpath.c_str(), query.c_str());
return -EIO;
}
if(0 != thargs.result){
S3FS_PRN_ERR("Await List Bucket Request by error(%d) [path=%s][query=%s]", thargs.result, strpath.c_str(), query.c_str());
return thargs.result;
}
return 0;
}
//
// Calls S3fsCurl::CheckBucket via check_service_req_threadworker
//
int check_service_request(const std::string& strpath, bool forceNoSSE, bool support_compat_dir, long& responseCode, std::string& responseBody)
{
// parameter for thread worker
check_service_req_thparam thargs;
thargs.path = strpath;
thargs.forceNoSSE = forceNoSSE;
thargs.support_compat_dir = support_compat_dir;
thargs.presponseCode = &responseCode;
thargs.presponseBody = &responseBody;
thargs.result = 0;
// make parameter for thread pool
thpoolman_param ppoolparam;
ppoolparam.args = &thargs;
ppoolparam.psem = nullptr; // case await
ppoolparam.pfunc = check_service_req_threadworker;
// send request by thread
if(!ThreadPoolMan::AwaitInstruct(ppoolparam)){
S3FS_PRN_ERR("failed to setup Await Check Service Request Thread Worker [path=%s][support compat dir=%s][force No SSE=%s]", strpath.c_str(), (support_compat_dir ? "true" : "false"), (forceNoSSE ? "true" : "false"));
return -EIO;
}
if(0 != thargs.result){
S3FS_PRN_ERR("Await Check Service Request by error(%d) [path=%s][support compat dir=%s][force No SSE=%s]", thargs.result, strpath.c_str(), (support_compat_dir ? "true" : "false"), (forceNoSSE ? "true" : "false"));
return thargs.result;
}
return 0;
}
//
// Calls S3fsCurl::PreMultipartUploadRequest via pre_multipart_upload_req_threadworker
//
// [NOTE]
// If the request is successful, sets upload_id.
//
int pre_multipart_upload_request(const std::string& path, const headers_t& meta, std::string& upload_id)
{
// parameter for thread worker
pre_multipart_upload_req_thparam thargs;
thargs.path = path;
thargs.meta = meta; // copy
thargs.upload_id.clear(); // clear
thargs.result = 0;
// make parameter for thread pool
thpoolman_param ppoolparam;
ppoolparam.args = &thargs;
ppoolparam.psem = nullptr; // case await
ppoolparam.pfunc = pre_multipart_upload_req_threadworker;
// send request by thread
if(!ThreadPoolMan::AwaitInstruct(ppoolparam)){
S3FS_PRN_ERR("failed to setup Pre Multipart Upload Request Thread Worker");
return -EIO;
}
if(0 != thargs.result){
S3FS_PRN_ERR("Pre Multipart Upload Request(path=%s) returns with error(%d)", path.c_str(), thargs.result);
return thargs.result;
}
// set upload_id
upload_id = thargs.upload_id;
return 0;
}
//
// Calls S3fsCurl::MultipartUploadComplete via complete_multipart_upload_threadworker
//
int complete_multipart_upload_request(const std::string& path, const std::string& upload_id, const etaglist_t& parts)
{
// parameter for thread worker
complete_multipart_upload_req_thparam thargs;
thargs.path = path;
thargs.upload_id = upload_id;
thargs.etaglist = parts; // copy
thargs.result = 0;
// make parameter for thread pool
thpoolman_param ppoolparam;
ppoolparam.args = &thargs;
ppoolparam.psem = nullptr; // case await
ppoolparam.pfunc = complete_multipart_upload_threadworker;
// send request by thread
if(!ThreadPoolMan::AwaitInstruct(ppoolparam)){
S3FS_PRN_ERR("failed to setup Complete Multipart Upload Request Thread Worker");
return -EIO;
}
if(0 != thargs.result){
S3FS_PRN_ERR("Complete Multipart Upload Request(path=%s) returns with error(%d)", path.c_str(), thargs.result);
return thargs.result;
}
return 0;
}
//
// Calls S3fsCurl::AbortMultipartUpload via abort_multipart_upload_req_threadworker
//
int abort_multipart_upload_request(const std::string& path, const std::string& upload_id)
{
// parameter for thread worker
abort_multipart_upload_req_thparam thargs;
thargs.path = path;
thargs.upload_id = upload_id;
thargs.result = 0;
// make parameter for thread pool
thpoolman_param ppoolparam;
ppoolparam.args = &thargs;
ppoolparam.psem = nullptr; // case await
ppoolparam.pfunc = abort_multipart_upload_req_threadworker;
// send request by thread
if(!ThreadPoolMan::AwaitInstruct(ppoolparam)){
S3FS_PRN_ERR("failed to setup Abort Multipart Upload Request Thread Worker");
return -EIO;
}
if(0 != thargs.result){
S3FS_PRN_ERR("Abort Multipart Upload Request(path=%s) returns with error(%d)", path.c_str(), thargs.result);
return thargs.result;
}
return 0;
}
//
// Calls S3fsCurl::GetObjectRequest via get_object_req_threadworker
//
int get_object_request(const std::string& path, int fd, off_t start, off_t size)
{
// parameter for thread worker
get_object_req_thparam thargs;
thargs.path = path;
thargs.fd = fd;
thargs.start = start;
thargs.size = size;
thargs.result = 0;
// make parameter for thread pool
thpoolman_param ppoolparam;
ppoolparam.args = &thargs;
ppoolparam.psem = nullptr; // case await
ppoolparam.pfunc = get_object_req_threadworker;
// send request by thread
if(!ThreadPoolMan::AwaitInstruct(ppoolparam)){
S3FS_PRN_ERR("failed to setup Await Get Object Request Thread Worker [path=%s][fd=%d][start=%lld][size=%lld]", path.c_str(), fd, static_cast<long long int>(start), static_cast<long long int>(size));
return -EIO;
}
if(0 != thargs.result){
S3FS_PRN_ERR("Await Get Object Request by error(%d) [path=%s][fd=%d][start=%lld][size=%lld]", thargs.result, path.c_str(), fd, static_cast<long long int>(start), static_cast<long long int>(size));
return thargs.result;
}
return 0;
}
//-------------------------------------------------------------------
// Direct Call Utility Functions
//-------------------------------------------------------------------
// These functions (mainly IAM token-related) are not called from
// a thread.
//
// [NOTE]
// The request for IAM token calls are called from S3fsCurl::RequestPerform
// method if the IAM token needs to be updated during each request
// processing. (NOTE: Each request is already executed in a thread.)
// If the number of threads has reached the limit when these functions
// are called, they will block until a thread that can execute this
// process is found.
// This may result in all processing being blocked.
// Therefore, the following functions(IAM token requests) will not be
// processed by a thread worker, but will process the request directly.
//
// If it is a different request called from within a thread worker,
// please process it like this.
//
//
// Directly calls S3fsCurl::GetIAMv2ApiToken
//
int get_iamv2api_token_request(const std::string& strurl, int tokenttl, const std::string& strttlhdr, std::string& token)
{
S3FS_PRN_INFO3("Get IAMv2 API Toekn Request directly [url=%s][token ttl=%d][ttl header=%s]", strurl.c_str(), tokenttl, strttlhdr.c_str());
S3fsCurl s3fscurl;
return s3fscurl.GetIAMv2ApiToken(strurl.c_str(), tokenttl, strttlhdr.c_str(), token);
}
//
// Directly calls S3fsCurl::GetIAMRoleFromMetaData
//
int get_iamrole_request(const std::string& strurl, const std::string& striamtoken, std::string& token)
{
S3FS_PRN_INFO3("Get IAM Role Request directly [url=%s][iam token=%s]", strurl.c_str(), striamtoken.c_str());
S3fsCurl s3fscurl;
int result = 0;
if(!s3fscurl.GetIAMRoleFromMetaData(strurl.c_str(), (striamtoken.empty() ? nullptr : striamtoken.c_str()), token)){
S3FS_PRN_ERR("Something error occurred during getting IAM Role from MetaData.");
result = -EIO;
}
return result;
}
//
// Directly calls S3fsCurl::GetIAMCredentials
//
int get_iamcred_request(const std::string& strurl, const std::string& striamtoken, const std::string& stribmsecret, std::string& cred)
{
S3FS_PRN_INFO3("Get IAM Credentials Request directly [url=%s][iam token=%s][ibm secrect access key=%s]", strurl.c_str(), striamtoken.c_str(), stribmsecret.c_str());
S3fsCurl s3fscurl;
int result = 0;
if(!s3fscurl.GetIAMCredentials(strurl.c_str(), (striamtoken.empty() ? nullptr : striamtoken.c_str()), (stribmsecret.empty() ? nullptr : stribmsecret.c_str()), cred)){
S3FS_PRN_ERR("Something error occurred during getting IAM Credentials.");
result = -EIO;
}
return result;
}
/*
* Local variables:
* tab-width: 4
* c-basic-offset: 4
* End:
* vim600: expandtab sw=4 ts=4 fdm=marker
* vim<600: expandtab sw=4 ts=4
*/

187
src/s3fs_threadreqs.h Normal file
View File

@ -0,0 +1,187 @@
/*
* s3fs - FUSE-based file system backed by Amazon S3
*
* Copyright(C) 2007 Randy Rizun <rrizun@gmail.com>
*
* This program is free software; you can redistribute it and/or
* modify it under the terms of the GNU General Public License
* as published by the Free Software Foundation; either version 2
* of the License, or (at your option) any later version.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU General Public License for more details.
*
* You should have received a copy of the GNU General Public License
* along with this program; if not, write to the Free Software
* Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA.
*/
#ifndef S3FS_THREADREQS_H_
#define S3FS_THREADREQS_H_
#include <string>
#include "common.h"
#include "metaheader.h"
#include "curl.h"
//-------------------------------------------------------------------
// Structures for MultiThread Request
//-------------------------------------------------------------------
//
// Head Request parameter structure for Thread Pool.
//
struct head_req_thparam
{
std::string path;
headers_t* pmeta = nullptr;
int result = 0;
};
//
// Delete Request parameter structure for Thread Pool.
//
struct delete_req_thparam
{
std::string path;
int result = 0;
};
//
// Put Head Request parameter structure for Thread Pool.
//
struct put_head_req_thparam
{
std::string path;
headers_t meta;
bool isCopy = false;
int result = 0;
};
//
// Put Request parameter structure for Thread Pool.
//
struct put_req_thparam
{
std::string path;
headers_t meta;
int fd = -1;
bool ahbe = false;
int result = 0;
};
//
// List Bucket Request parameter structure for Thread Pool.
//
struct list_bucket_req_thparam
{
std::string path;
std::string query;
std::string* presponseBody = nullptr;
int result = 0;
};
//
// Check Service Request parameter structure for Thread Pool.
//
struct check_service_req_thparam
{
std::string path;
bool forceNoSSE = false;
bool support_compat_dir = false;
long* presponseCode = nullptr;
std::string* presponseBody = nullptr;
int result = 0;
};
//
// Pre Multipart Upload Request parameter structure for Thread Pool.
//
struct pre_multipart_upload_req_thparam
{
std::string path;
headers_t meta;
std::string upload_id;
int result = 0;
};
//
// Complete Multipart Upload Request parameter structure for Thread Pool.
//
struct complete_multipart_upload_req_thparam
{
std::string path;
std::string upload_id;
etaglist_t etaglist;
int result = 0;
};
//
// Abort Multipart Upload Request parameter structure for Thread Pool.
//
struct abort_multipart_upload_req_thparam
{
std::string path;
std::string upload_id;
int result = 0;
};
//
// Get Object Request parameter structure for Thread Pool.
//
struct get_object_req_thparam
{
std::string path;
int fd = -1;
off_t start = 0;
off_t size = 0;
int result = 0;
};
//-------------------------------------------------------------------
// Thread Worker functions for MultiThread Request
//-------------------------------------------------------------------
void* head_req_threadworker(void* arg);
void* delete_req_threadworker(void* arg);
void* put_head_req_threadworker(void* arg);
void* put_req_threadworker(void* arg);
void* list_bucket_req_threadworker(void* arg);
void* check_service_req_threadworker(void* arg);
void* pre_multipart_upload_req_threadworker(void* arg);
void* complete_multipart_upload_threadworker(void* arg);
void* abort_multipart_upload_req_threadworker(void* arg);
void* get_object_req_threadworker(void* arg);
//-------------------------------------------------------------------
// Utility functions
//-------------------------------------------------------------------
int head_request(const std::string& strpath, headers_t& header);
int delete_request(const std::string& strpath);
int put_head_request(const std::string& strpath, const headers_t& meta, bool is_copy);
int put_request(const std::string& strpath, const headers_t& meta, int fd, bool ahbe);
int list_bucket_request(const std::string& strpath, const std::string& query, std::string& responseBody);
int check_service_request(const std::string& strpath, bool forceNoSSE, bool support_compat_dir, long& responseCode, std::string& responseBody);
int pre_multipart_upload_request(const std::string& path, const headers_t& meta, std::string& upload_id);
int complete_multipart_upload_request(const std::string& path, const std::string& upload_id, const etaglist_t& parts);
int abort_multipart_upload_request(const std::string& path, const std::string& upload_id);
int get_object_request(const std::string& path, int fd, off_t start, off_t size);
//-------------------------------------------------------------------
// Direct Call Utility Functions
//-------------------------------------------------------------------
int get_iamv2api_token_request(const std::string& strurl, int tokenttl, const std::string& strttlhdr, std::string& token);
int get_iamrole_request(const std::string& strurl, const std::string& striamtoken, std::string& token);
int get_iamcred_request(const std::string& strurl, const std::string& striamtoken, const std::string& stribmsecret, std::string& cred);
#endif // S3FS_THREADREQS_H_
/*
* Local variables:
* tab-width: 4
* c-basic-offset: 4
* End:
* vim600: expandtab sw=4 ts=4 fdm=marker
* vim<600: expandtab sw=4 ts=4
*/

View File

@ -58,10 +58,41 @@ bool ThreadPoolMan::Instruct(const thpoolman_param& param)
S3FS_PRN_WARN("The singleton object is not initialized yet.");
return false;
}
if(!param.psem){
S3FS_PRN_ERR("Thread parameter Semaphore is null.");
return false;
}
ThreadPoolMan::singleton->SetInstruction(param);
return true;
}
bool ThreadPoolMan::AwaitInstruct(const thpoolman_param& param)
{
if(!ThreadPoolMan::singleton){
S3FS_PRN_WARN("The singleton object is not initialized yet.");
return false;
}
if(param.psem){
S3FS_PRN_ERR("Thread parameter Semaphore must be null.");
return false;
}
// Setup local thpoolman_param structure with local Semaphore
thpoolman_param local_param;
Semaphore await_sem(0);
local_param.args = param.args;
local_param.psem = &await_sem;
local_param.pfunc = param.pfunc;
// Set parameters and run thread worker
ThreadPoolMan::singleton->SetInstruction(local_param);
// wait until the thread is complete
await_sem.acquire();
return true;
}
//
// Thread worker
//

View File

@ -92,6 +92,7 @@ class ThreadPoolMan
static bool Initialize(int count);
static void Destroy();
static bool Instruct(const thpoolman_param& pparam);
static bool AwaitInstruct(const thpoolman_param& param);
};
#endif // S3FS_THREADPOOLMAN_H_