Merge pull request #965 from ggtakec/fix_multi_x

Improvement of curl session pool for multipart
This commit is contained in:
Takeshi Nakatani 2019-02-27 22:44:25 +09:00 committed by GitHub
commit 0d43d070cc
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
2 changed files with 268 additions and 183 deletions

View File

@ -238,7 +238,6 @@ const char* BodyData::str() const
//-------------------------------------------------------------------
// Class CurlHandlerPool
//-------------------------------------------------------------------
bool CurlHandlerPool::Init()
{
if (0 != pthread_mutex_init(&mLock, NULL)) {
@ -246,14 +245,14 @@ bool CurlHandlerPool::Init()
return false;
}
mHandlers = new CURL*[mMaxHandlers](); // this will init the array to 0
for (int i = 0; i < mMaxHandlers; ++i, ++mIndex) {
mHandlers[i] = curl_easy_init();
if (!mHandlers[i]) {
for(int cnt = 0; cnt < mMaxHandlers; ++cnt){
CURL* hCurl = curl_easy_init();
if(!hCurl){
S3FS_PRN_ERR("Init curl handlers pool failed");
Destroy();
return false;
}
mPool.push_back(hCurl);
}
return true;
@ -261,13 +260,13 @@ bool CurlHandlerPool::Init()
bool CurlHandlerPool::Destroy()
{
assert(mIndex >= -1 && mIndex < mMaxHandlers);
for (int i = 0; i <= mIndex; ++i) {
curl_easy_cleanup(mHandlers[i]);
while(!mPool.empty()){
CURL* hCurl = mPool.back();
mPool.pop_back();
if(hCurl){
curl_easy_cleanup(hCurl);
}
}
delete[] mHandlers;
if (0 != pthread_mutex_destroy(&mLock)) {
S3FS_PRN_ERR("Destroy curl handlers lock failed");
return false;
@ -276,47 +275,51 @@ bool CurlHandlerPool::Destroy()
return true;
}
CURL* CurlHandlerPool::GetHandler()
CURL* CurlHandlerPool::GetHandler(bool only_pool)
{
CURL* h = NULL;
assert(mIndex >= -1 && mIndex < mMaxHandlers);
CURL* hCurl = NULL;
{
AutoLock lock(&mLock);
if (mIndex >= 0) {
S3FS_PRN_DBG("Get handler from pool: %d", mIndex);
h = mHandlers[mIndex--];
if(!mPool.empty()){
hCurl = mPool.back();
mPool.pop_back();
S3FS_PRN_DBG("Get handler from pool: rest = %d", static_cast<int>(mPool.size()));
}
}
if (!h) {
S3FS_PRN_INFO("Pool empty: create new handler");
h = curl_easy_init();
if(only_pool){
return hCurl;
}
return h;
if(!hCurl){
S3FS_PRN_INFO("Pool empty: force to create new handler");
hCurl = curl_easy_init();
}
return hCurl;
}
void CurlHandlerPool::ReturnHandler(CURL* h)
void CurlHandlerPool::ReturnHandler(CURL* hCurl, bool restore_pool)
{
bool needCleanup = true;
assert(mIndex >= -1 && mIndex < mMaxHandlers);
{
AutoLock lock(&mLock);
if (mIndex < mMaxHandlers - 1) {
mHandlers[++mIndex] = h;
curl_easy_reset(h);
needCleanup = false;
S3FS_PRN_DBG("Return handler to pool: %d", mIndex);
}
if(!hCurl){
return;
}
if (needCleanup) {
if(restore_pool){
AutoLock lock(&mLock);
S3FS_PRN_DBG("Return handler to pool");
mPool.push_back(hCurl);
while(mMaxHandlers <= static_cast<int>(mPool.size())){
CURL* hOldCurl = mPool.front();
mPool.pop_front();
if(hOldCurl){
S3FS_PRN_INFO("Pool full: destroy the oldest handler");
curl_easy_cleanup(hOldCurl);
}
}
}else{
S3FS_PRN_INFO("Pool full: destroy the handler");
curl_easy_cleanup(h);
curl_easy_cleanup(hCurl);
}
}
@ -409,6 +412,12 @@ bool S3fsCurl::InitS3fsCurl(const char* MimeFile)
if(!S3fsCurl::InitCryptMutex()){
return false;
}
// [NOTE]
// sCurlPoolSize must be over parrallel(or multireq) count.
//
if(sCurlPoolSize < std::max(GetMaxParallelCount(), GetMaxMultiRequest())){
sCurlPoolSize = std::max(GetMaxParallelCount(), GetMaxMultiRequest());
}
sCurlPool = new CurlHandlerPool(sCurlPoolSize);
if (!sCurlPool->Init()) {
return false;
@ -1528,6 +1537,91 @@ int S3fsCurl::ParallelGetObjectRequest(const char* tpath, int fd, off_t start, s
return result;
}
bool S3fsCurl::UploadMultipartPostSetCurlOpts(S3fsCurl* s3fscurl)
{
if(!s3fscurl){
return false;
}
if(!s3fscurl->CreateCurlHandle()){
return false;
}
curl_easy_setopt(s3fscurl->hCurl, CURLOPT_URL, s3fscurl->url.c_str());
curl_easy_setopt(s3fscurl->hCurl, CURLOPT_UPLOAD, true); // HTTP PUT
curl_easy_setopt(s3fscurl->hCurl, CURLOPT_WRITEDATA, (void*)(s3fscurl->bodydata));
curl_easy_setopt(s3fscurl->hCurl, CURLOPT_WRITEFUNCTION, WriteMemoryCallback);
curl_easy_setopt(s3fscurl->hCurl, CURLOPT_HEADERDATA, (void*)&(s3fscurl->responseHeaders));
curl_easy_setopt(s3fscurl->hCurl, CURLOPT_HEADERFUNCTION, HeaderCallback);
curl_easy_setopt(s3fscurl->hCurl, CURLOPT_INFILESIZE_LARGE, static_cast<curl_off_t>(s3fscurl->partdata.size)); // Content-Length
curl_easy_setopt(s3fscurl->hCurl, CURLOPT_READFUNCTION, UploadReadCallback);
curl_easy_setopt(s3fscurl->hCurl, CURLOPT_READDATA, (void*)s3fscurl);
curl_easy_setopt(s3fscurl->hCurl, CURLOPT_HTTPHEADER, s3fscurl->requestHeaders);
S3fsCurl::AddUserAgent(s3fscurl->hCurl); // put User-Agent
return true;
}
bool S3fsCurl::CopyMultipartPostSetCurlOpts(S3fsCurl* s3fscurl)
{
if(!s3fscurl){
return false;
}
if(!s3fscurl->CreateCurlHandle()){
return false;
}
curl_easy_setopt(s3fscurl->hCurl, CURLOPT_URL, s3fscurl->url.c_str());
curl_easy_setopt(s3fscurl->hCurl, CURLOPT_UPLOAD, true); // HTTP PUT
curl_easy_setopt(s3fscurl->hCurl, CURLOPT_WRITEDATA, (void*)(s3fscurl->bodydata));
curl_easy_setopt(s3fscurl->hCurl, CURLOPT_WRITEFUNCTION, WriteMemoryCallback);
curl_easy_setopt(s3fscurl->hCurl, CURLOPT_HEADERDATA, (void*)(s3fscurl->headdata));
curl_easy_setopt(s3fscurl->hCurl, CURLOPT_HEADERFUNCTION, WriteMemoryCallback);
curl_easy_setopt(s3fscurl->hCurl, CURLOPT_INFILESIZE, 0); // Content-Length
curl_easy_setopt(s3fscurl->hCurl, CURLOPT_HTTPHEADER, s3fscurl->requestHeaders);
S3fsCurl::AddUserAgent(s3fscurl->hCurl); // put User-Agent
return true;
}
bool S3fsCurl::PreGetObjectRequestSetCurlOpts(S3fsCurl* s3fscurl)
{
if(!s3fscurl){
return false;
}
if(!s3fscurl->CreateCurlHandle()){
return false;
}
curl_easy_setopt(s3fscurl->hCurl, CURLOPT_URL, s3fscurl->url.c_str());
curl_easy_setopt(s3fscurl->hCurl, CURLOPT_HTTPHEADER, s3fscurl->requestHeaders);
curl_easy_setopt(s3fscurl->hCurl, CURLOPT_WRITEFUNCTION, DownloadWriteCallback);
curl_easy_setopt(s3fscurl->hCurl, CURLOPT_WRITEDATA, (void*)s3fscurl);
S3fsCurl::AddUserAgent(s3fscurl->hCurl); // put User-Agent
return true;
}
bool S3fsCurl::PreHeadRequestSetCurlOpts(S3fsCurl* s3fscurl)
{
if(!s3fscurl){
return false;
}
if(!s3fscurl->CreateCurlHandle()){
return false;
}
curl_easy_setopt(s3fscurl->hCurl, CURLOPT_URL, s3fscurl->url.c_str());
curl_easy_setopt(s3fscurl->hCurl, CURLOPT_NOBODY, true); // HEAD
curl_easy_setopt(s3fscurl->hCurl, CURLOPT_FILETIME, true); // Last-Modified
curl_easy_setopt(s3fscurl->hCurl, CURLOPT_HTTPHEADER, s3fscurl->requestHeaders);
// responseHeaders
curl_easy_setopt(s3fscurl->hCurl, CURLOPT_HEADERDATA, (void*)&(s3fscurl->responseHeaders));
curl_easy_setopt(s3fscurl->hCurl, CURLOPT_HEADERFUNCTION, HeaderCallback);
S3fsCurl::AddUserAgent(s3fscurl->hCurl); // put User-Agent
return true;
}
bool S3fsCurl::ParseIAMCredentialResponse(const char* response, iamcredmap_t& keyval)
{
if(!response){
@ -1735,7 +1829,7 @@ S3fsCurl::S3fsCurl(bool ahbe) :
bodydata(NULL), headdata(NULL), LastResponseCode(-1), postdata(NULL), postdata_remaining(0), is_use_ahbe(ahbe),
retry_count(0), b_infile(NULL), b_postdata(NULL), b_postdata_remaining(0), b_partdata_startpos(0), b_partdata_size(0),
b_ssekey_pos(-1), b_ssevalue(""), b_ssetype(SSE_DISABLE), op(""), query_string(""),
sem(NULL), completed_tids_lock(NULL), completed_tids(NULL)
sem(NULL), completed_tids_lock(NULL), completed_tids(NULL), fpRazySetup(NULL)
{
}
@ -1754,6 +1848,9 @@ bool S3fsCurl::ResetHandle()
curl_easy_setopt(hCurl, CURLOPT_PROGRESSFUNCTION, S3fsCurl::CurlProgress);
curl_easy_setopt(hCurl, CURLOPT_PROGRESSDATA, hCurl);
// curl_easy_setopt(hCurl, CURLOPT_FORBID_REUSE, 1);
curl_easy_setopt(hCurl, CURLOPT_TCP_KEEPALIVE, 1);
// curl_easy_setopt(hCurl, CURLOPT_KEEP_SENDING_ON_ERROR, 1); // after 7.51.0
// curl_easy_setopt(hCurl, CURLOPT_SSL_ENABLE_ALPN, 0); // after 7.36.0 for disable ALPN for s3 server
if(type != REQTYPE_IAMCRED && type != REQTYPE_IAMROLE){
// REQTYPE_IAMCRED and REQTYPE_IAMROLE are always HTTP
@ -1788,25 +1885,30 @@ bool S3fsCurl::ResetHandle()
return true;
}
bool S3fsCurl::CreateCurlHandle(bool force)
bool S3fsCurl::CreateCurlHandle(bool only_pool, bool remake)
{
AutoLock lock(&S3fsCurl::curl_handles_lock);
if(hCurl){
if(!force){
S3FS_PRN_WARN("already create handle.");
return false;
}
if(!DestroyCurlHandle(true)){
if(hCurl && remake){
if(!DestroyCurlHandle(false)){
S3FS_PRN_ERR("could not destroy handle.");
return false;
}
S3FS_PRN_INFO3("already has handle, so destroyed it.");
S3FS_PRN_INFO3("already has handle, so destroyed it or restored it to pool.");
}
if(NULL == (hCurl = sCurlPool->GetHandler())){
S3FS_PRN_ERR("Failed to create handle.");
return false;
if(!hCurl){
if(NULL == (hCurl = sCurlPool->GetHandler(only_pool))){
if(!only_pool){
S3FS_PRN_ERR("Failed to create handle.");
return false;
}else{
// [NOTE]
// urther initialization processing is left to razy processing to be executed later.
// (Currently we do not use only_pool=true, but this code is remained for the future)
return true;
}
}
}
// [NOTE]
@ -1823,20 +1925,18 @@ bool S3fsCurl::CreateCurlHandle(bool force)
return true;
}
bool S3fsCurl::DestroyCurlHandle(bool force)
bool S3fsCurl::DestroyCurlHandle(bool restore_pool, bool clear_internal_data)
{
ClearInternalData();
if(clear_internal_data){
ClearInternalData();
}
if(hCurl){
AutoLock lock(&S3fsCurl::curl_handles_lock);
S3fsCurl::curl_times.erase(hCurl);
S3fsCurl::curl_progress.erase(hCurl);
if(retry_count == 0 || force){
sCurlPool->ReturnHandler(hCurl);
}else{
curl_easy_cleanup(hCurl);
}
sCurlPool->ReturnHandler(hCurl, restore_pool);
hCurl = NULL;
}else{
return false;
@ -1879,6 +1979,8 @@ bool S3fsCurl::ClearInternalData()
b_partdata_size = 0;
partdata.clear();
fpRazySetup = NULL;
S3FS_MALLOCTRIM(0);
return true;
@ -1891,16 +1993,21 @@ bool S3fsCurl::SetUseAhbe(bool ahbe)
return old;
}
bool S3fsCurl::GetResponseCode(long& responseCode)
bool S3fsCurl::GetResponseCode(long& responseCode, bool from_curl_handle)
{
if(!hCurl){
return false;
}
responseCode = -1;
if(CURLE_OK != curl_easy_getinfo(hCurl, CURLINFO_RESPONSE_CODE, &LastResponseCode)){
return false;
if(!from_curl_handle){
responseCode = LastResponseCode;
}else{
if(!hCurl){
return false;
}
if(CURLE_OK != curl_easy_getinfo(hCurl, CURLINFO_RESPONSE_CODE, &LastResponseCode)){
return false;
}
responseCode = LastResponseCode;
}
responseCode = LastResponseCode;
return true;
}
@ -1945,12 +2052,7 @@ bool S3fsCurl::RemakeHandle()
partdata.size = b_partdata_size;
// reset handle
curl_easy_cleanup(hCurl);
hCurl = curl_easy_init();
ResetHandle();
// disable ssl cache, so that a new session will be created
curl_easy_setopt(hCurl, CURLOPT_SSL_SESSIONID_CACHE, 0);
curl_easy_setopt(hCurl, CURLOPT_SHARE, NULL);
// set options
switch(type){
@ -2558,7 +2660,7 @@ int S3fsCurl::DeleteRequest(const char* tpath)
if(!tpath){
return -1;
}
if(!CreateCurlHandle(true)){
if(!CreateCurlHandle()){
return -1;
}
string resource;
@ -2600,7 +2702,7 @@ int S3fsCurl::GetIAMCredentials()
// at first set type for handle
type = REQTYPE_IAMCRED;
if(!CreateCurlHandle(true)){
if(!CreateCurlHandle()){
return -EIO;
}
@ -2668,7 +2770,7 @@ bool S3fsCurl::LoadIAMRoleFromMetaData()
// at first set type for handle
type = REQTYPE_IAMROLE;
if(!CreateCurlHandle(true)){
if(!CreateCurlHandle()){
return false;
}
@ -2744,9 +2846,6 @@ bool S3fsCurl::PreHeadRequest(const char* tpath, const char* bpath, const char*
if(!tpath){
return false;
}
if(!CreateCurlHandle(true)){
return false;
}
string resource;
string turl;
MakeUrlResource(get_realpath(tpath).c_str(), resource, turl);
@ -2773,15 +2872,8 @@ bool S3fsCurl::PreHeadRequest(const char* tpath, const char* bpath, const char*
type = REQTYPE_HEAD;
insertAuthHeaders();
curl_easy_setopt(hCurl, CURLOPT_URL, url.c_str());
curl_easy_setopt(hCurl, CURLOPT_NOBODY, true); // HEAD
curl_easy_setopt(hCurl, CURLOPT_FILETIME, true); // Last-Modified
curl_easy_setopt(hCurl, CURLOPT_HTTPHEADER, requestHeaders);
// responseHeaders
curl_easy_setopt(hCurl, CURLOPT_HEADERDATA, (void*)&responseHeaders);
curl_easy_setopt(hCurl, CURLOPT_HEADERFUNCTION, HeaderCallback);
S3fsCurl::AddUserAgent(hCurl); // put User-Agent
// set razy function
fpRazySetup = PreHeadRequestSetCurlOpts;
return true;
}
@ -2793,7 +2885,7 @@ int S3fsCurl::HeadRequest(const char* tpath, headers_t& meta)
S3FS_PRN_INFO3("[tpath=%s]", SAFESTRPTR(tpath));
// At first, try to get without SSE-C headers
if(!PreHeadRequest(tpath) || 0 != (result = RequestPerform())){
if(!PreHeadRequest(tpath) || !fpRazySetup || !fpRazySetup(this) || 0 != (result = RequestPerform())){
// If has SSE-C keys, try to get with all SSE-C keys.
for(int pos = 0; static_cast<size_t>(pos) < S3fsCurl::sseckeys.size(); pos++){
if(!DestroyCurlHandle()){
@ -2802,6 +2894,10 @@ int S3fsCurl::HeadRequest(const char* tpath, headers_t& meta)
if(!PreHeadRequest(tpath, NULL, NULL, pos)){
break;
}
if(!fpRazySetup || !fpRazySetup(this)){
S3FS_PRN_ERR("Failed to razy setup in single head request.");
break;
}
if(0 == (result = RequestPerform())){
break;
}
@ -2840,7 +2936,7 @@ int S3fsCurl::PutHeadRequest(const char* tpath, headers_t& meta, bool is_copy)
if(!tpath){
return -1;
}
if(!CreateCurlHandle(true)){
if(!CreateCurlHandle()){
return -1;
}
string resource;
@ -2976,7 +3072,7 @@ int S3fsCurl::PutRequest(const char* tpath, headers_t& meta, int fd)
S3FS_PRN_INFO3("create zero byte file object.");
}
if(!CreateCurlHandle(true)){
if(!CreateCurlHandle()){
if(file){
fclose(file);
}
@ -3075,9 +3171,6 @@ int S3fsCurl::PreGetObjectRequest(const char* tpath, int fd, off_t start, ssize_
return -1;
}
if(!CreateCurlHandle(true)){
return -1;
}
string resource;
string turl;
MakeUrlResource(get_realpath(tpath).c_str(), resource, turl);
@ -3103,12 +3196,8 @@ int S3fsCurl::PreGetObjectRequest(const char* tpath, int fd, off_t start, ssize_
type = REQTYPE_GET;
insertAuthHeaders();
// setopt
curl_easy_setopt(hCurl, CURLOPT_URL, url.c_str());
curl_easy_setopt(hCurl, CURLOPT_HTTPHEADER, requestHeaders);
curl_easy_setopt(hCurl, CURLOPT_WRITEFUNCTION, S3fsCurl::DownloadWriteCallback);
curl_easy_setopt(hCurl, CURLOPT_WRITEDATA, (void*)this);
S3fsCurl::AddUserAgent(hCurl); // put User-Agent
// set razy function
fpRazySetup = PreGetObjectRequestSetCurlOpts;
// set info for callback func.
// (use only fd, startpos and size, other member is not used.)
@ -3143,6 +3232,10 @@ int S3fsCurl::GetObjectRequest(const char* tpath, int fd, off_t start, ssize_t s
if(0 != (result = PreGetObjectRequest(tpath, fd, start, size, ssetype, ssevalue))){
return result;
}
if(!fpRazySetup || !fpRazySetup(this)){
S3FS_PRN_ERR("Failed to razy setup in single get object request.");
return -1;
}
S3FS_PRN_INFO3("downloading... [path=%s][fd=%d]", tpath, fd);
@ -3156,7 +3249,7 @@ int S3fsCurl::CheckBucket()
{
S3FS_PRN_INFO3("check a bucket.");
if(!CreateCurlHandle(true)){
if(!CreateCurlHandle()){
return -1;
}
string resource;
@ -3194,7 +3287,7 @@ int S3fsCurl::ListBucketRequest(const char* tpath, const char* query)
if(!tpath){
return -1;
}
if(!CreateCurlHandle(true)){
if(!CreateCurlHandle()){
return -1;
}
string resource;
@ -3242,7 +3335,7 @@ int S3fsCurl::PreMultipartPostRequest(const char* tpath, headers_t& meta, string
if(!tpath){
return -1;
}
if(!CreateCurlHandle(true)){
if(!CreateCurlHandle()){
return -1;
}
string resource;
@ -3374,7 +3467,7 @@ int S3fsCurl::CompleteMultipartPostRequest(const char* tpath, string& upload_id,
postdata_remaining = postContent.size(); // without null
b_postdata_remaining = postdata_remaining;
if(!CreateCurlHandle(true)){
if(!CreateCurlHandle()){
return -1;
}
string resource;
@ -3421,7 +3514,7 @@ int S3fsCurl::MultipartListRequest(string& body)
{
S3FS_PRN_INFO3("list request(multipart)");
if(!CreateCurlHandle(true)){
if(!CreateCurlHandle()){
return -1;
}
string resource;
@ -3468,7 +3561,7 @@ int S3fsCurl::AbortMultipartUpload(const char* tpath, string& upload_id)
if(!tpath){
return -1;
}
if(!CreateCurlHandle(true)){
if(!CreateCurlHandle()){
return -1;
}
string resource;
@ -3533,11 +3626,6 @@ int S3fsCurl::UploadMultipartPostSetup(const char* tpath, int part_num, const st
free(md5raw);
}
// create handle
if(!CreateCurlHandle(true)){
return -1;
}
// make request
query_string = "partNumber=" + str(part_num) + "&uploadId=" + upload_id;
string urlargs = "?" + query_string;
@ -3566,18 +3654,8 @@ int S3fsCurl::UploadMultipartPostSetup(const char* tpath, int part_num, const st
type = REQTYPE_UPLOADMULTIPOST;
insertAuthHeaders();
// setopt
curl_easy_setopt(hCurl, CURLOPT_URL, url.c_str());
curl_easy_setopt(hCurl, CURLOPT_UPLOAD, true); // HTTP PUT
curl_easy_setopt(hCurl, CURLOPT_WRITEDATA, (void*)bodydata);
curl_easy_setopt(hCurl, CURLOPT_WRITEFUNCTION, WriteMemoryCallback);
curl_easy_setopt(hCurl, CURLOPT_HEADERDATA, (void*)&responseHeaders);
curl_easy_setopt(hCurl, CURLOPT_HEADERFUNCTION, HeaderCallback);
curl_easy_setopt(hCurl, CURLOPT_INFILESIZE_LARGE, static_cast<curl_off_t>(partdata.size)); // Content-Length
curl_easy_setopt(hCurl, CURLOPT_READFUNCTION, S3fsCurl::UploadReadCallback);
curl_easy_setopt(hCurl, CURLOPT_READDATA, (void*)this);
curl_easy_setopt(hCurl, CURLOPT_HTTPHEADER, requestHeaders);
S3fsCurl::AddUserAgent(hCurl); // put User-Agent
// set razy function
fpRazySetup = UploadMultipartPostSetCurlOpts;
return 0;
}
@ -3615,9 +3693,6 @@ int S3fsCurl::CopyMultipartPostSetup(const char* from, const char* to, int part_
if(!from || !to){
return -1;
}
if(!CreateCurlHandle(true)){
return -1;
}
query_string = "partNumber=" + str(part_num) + "&uploadId=" + upload_id;
string urlargs = "?" + query_string;
string resource;
@ -3650,16 +3725,8 @@ int S3fsCurl::CopyMultipartPostSetup(const char* from, const char* to, int part_
type = REQTYPE_COPYMULTIPOST;
insertAuthHeaders();
// setopt
curl_easy_setopt(hCurl, CURLOPT_URL, url.c_str());
curl_easy_setopt(hCurl, CURLOPT_UPLOAD, true); // HTTP PUT
curl_easy_setopt(hCurl, CURLOPT_WRITEDATA, (void*)bodydata);
curl_easy_setopt(hCurl, CURLOPT_WRITEFUNCTION, WriteMemoryCallback);
curl_easy_setopt(hCurl, CURLOPT_HEADERDATA, (void*)headdata);
curl_easy_setopt(hCurl, CURLOPT_HEADERFUNCTION, WriteMemoryCallback);
curl_easy_setopt(hCurl, CURLOPT_INFILESIZE, 0); // Content-Length
curl_easy_setopt(hCurl, CURLOPT_HTTPHEADER, requestHeaders);
S3fsCurl::AddUserAgent(hCurl); // put User-Agent
// set razy function
fpRazySetup = CopyMultipartPostSetCurlOpts;
// request
S3FS_PRN_INFO3("copying... [from=%s][to=%s][part=%d]", from, to, part_num);
@ -3972,23 +4039,23 @@ S3fsMultiCurl::~S3fsMultiCurl()
bool S3fsMultiCurl::ClearEx(bool is_all)
{
s3fscurlmap_t::iterator iter;
for(iter = cMap_req.begin(); iter != cMap_req.end(); ++iter){
S3fsCurl* s3fscurl = (*iter).second;
s3fscurllist_t::iterator iter;
for(iter = clist_req.begin(); iter != clist_req.end(); ++iter){
S3fsCurl* s3fscurl = *iter;
if(s3fscurl){
s3fscurl->DestroyCurlHandle();
delete s3fscurl; // with destroy curl handle.
}
}
cMap_req.clear();
clist_req.clear();
if(is_all){
for(iter = cMap_all.begin(); iter != cMap_all.end(); ++iter){
S3fsCurl* s3fscurl = (*iter).second;
for(iter = clist_all.begin(); iter != clist_all.end(); ++iter){
S3fsCurl* s3fscurl = *iter;
s3fscurl->DestroyCurlHandle();
delete s3fscurl;
}
cMap_all.clear();
clist_all.clear();
}
S3FS_MALLOCTRIM(0);
@ -4015,10 +4082,8 @@ bool S3fsMultiCurl::SetS3fsCurlObject(S3fsCurl* s3fscurl)
if(!s3fscurl){
return false;
}
if(cMap_all.end() != cMap_all.find(s3fscurl->hCurl)){
return false;
}
cMap_all[s3fscurl->hCurl] = s3fscurl;
clist_all.push_back(s3fscurl);
return true;
}
@ -4030,9 +4095,9 @@ int S3fsMultiCurl::MultiPerform()
Semaphore sem(GetMaxParallelism());
int rc;
for(s3fscurlmap_t::iterator iter = cMap_req.begin(); iter != cMap_req.end(); ++iter) {
for(s3fscurllist_t::iterator iter = clist_req.begin(); iter != clist_req.end(); ++iter) {
pthread_t thread;
S3fsCurl* s3fscurl = (*iter).second;
S3fsCurl* s3fscurl = *iter;
s3fscurl->sem = &sem;
s3fscurl->completed_tids_lock = &completed_tids_lock;
s3fscurl->completed_tids = &completed_tids;
@ -4096,13 +4161,13 @@ int S3fsMultiCurl::MultiPerform()
int S3fsMultiCurl::MultiRead()
{
for(s3fscurlmap_t::iterator iter = cMap_req.begin(); iter != cMap_req.end(); ++iter) {
S3fsCurl* s3fscurl = (*iter).second;
for(s3fscurllist_t::iterator iter = clist_req.begin(); iter != clist_req.end(); ++iter) {
S3fsCurl* s3fscurl = *iter;
bool isRetry = false;
long responseCode = -1;
if(s3fscurl->GetResponseCode(responseCode)){
if(s3fscurl->GetResponseCode(responseCode, false)){
if(400 > responseCode){
// add into stat cache
if(SuccessCallback && !SuccessCallback(s3fscurl)){
@ -4133,7 +4198,6 @@ int S3fsMultiCurl::MultiRead()
S3FS_PRN_ERR("failed a request(Unknown response code: %s)", s3fscurl->url.c_str());
}
if(!isRetry){
s3fscurl->DestroyCurlHandle();
delete s3fscurl;
@ -4145,7 +4209,7 @@ int S3fsMultiCurl::MultiRead()
if(RetryCallback){
retrycurl = RetryCallback(s3fscurl);
if(NULL != retrycurl){
cMap_all[retrycurl->hCurl] = retrycurl;
clist_all.push_back(retrycurl);
}else{
// Could not set up callback.
return -EIO;
@ -4157,31 +4221,29 @@ int S3fsMultiCurl::MultiRead()
}
}
}
cMap_req.clear();
clist_req.clear();
return 0;
}
int S3fsMultiCurl::Request()
{
S3FS_PRN_INFO3("[count=%zu]", cMap_all.size());
S3FS_PRN_INFO3("[count=%zu]", clist_all.size());
// Make request list.
//
// Send multi request loop( with retry )
// (When many request is sends, sometimes gets "Couldn't connect to server")
//
while(!cMap_all.empty()){
while(!clist_all.empty()){
// set curl handle to multi handle
int result;
s3fscurlmap_t::iterator iter;
for(iter = cMap_all.begin(); iter != cMap_all.end(); ++iter){
CURL* hCurl = (*iter).first;
S3fsCurl* s3fscurl = (*iter).second;
cMap_req[hCurl] = s3fscurl;
int result;
s3fscurllist_t::iterator iter;
for(iter = clist_all.begin(); iter != clist_all.end(); ++iter){
S3fsCurl* s3fscurl = *iter;
clist_req.push_back(s3fscurl);
}
cMap_all.clear();
clist_all.clear();
// Send multi request.
if(0 != (result = MultiPerform())){
@ -4202,12 +4264,27 @@ int S3fsMultiCurl::Request()
}
// thread function for performing an S3fsCurl request
void* S3fsMultiCurl::RequestPerformWrapper(void* arg) {
S3fsCurl* s3fscurl = static_cast<S3fsCurl*>(arg);
void *result = (void*)(intptr_t)(s3fscurl->RequestPerform());
AutoLock lock(s3fscurl->completed_tids_lock);
//
void* S3fsMultiCurl::RequestPerformWrapper(void* arg)
{
S3fsCurl* s3fscurl= static_cast<S3fsCurl*>(arg);
void* result = NULL;
if(s3fscurl && s3fscurl->fpRazySetup){
if(!s3fscurl->fpRazySetup(s3fscurl)){
S3FS_PRN_ERR("Failed to razy setup, then respond EIO.");
result = (void*)(intptr_t)(-EIO);
}
}
if(!result){
result = (void*)(intptr_t)(s3fscurl->RequestPerform());
s3fscurl->DestroyCurlHandle(true, false);
}
AutoLock lock(s3fscurl->completed_tids_lock);
s3fscurl->completed_tids->push_back(pthread_self());
s3fscurl->sem->post();
return result;
}

View File

@ -128,14 +128,12 @@ class S3fsMultiCurl;
//----------------------------------------------
// class CurlHandlerPool
//----------------------------------------------
typedef std::list<CURL*> hcurllist_t;
class CurlHandlerPool
{
public:
explicit CurlHandlerPool(int maxHandlers)
: mMaxHandlers(maxHandlers)
, mHandlers(NULL)
, mIndex(-1)
explicit CurlHandlerPool(int maxHandlers) : mMaxHandlers(maxHandlers)
{
assert(maxHandlers > 0);
}
@ -143,20 +141,23 @@ public:
bool Init();
bool Destroy();
CURL* GetHandler();
void ReturnHandler(CURL* h);
CURL* GetHandler(bool only_pool);
void ReturnHandler(CURL* hCurl, bool restore_pool);
private:
int mMaxHandlers;
int mMaxHandlers;
pthread_mutex_t mLock;
CURL** mHandlers;
int mIndex;
hcurllist_t mPool;
};
//----------------------------------------------
// class S3fsCurl
//----------------------------------------------
class S3fsCurl;
// Prototype function for razy setup options for curl handle
typedef bool (*s3fscurl_razy_setup)(S3fsCurl* s3fscurl);
typedef std::map<std::string, std::string> iamcredmap_t;
typedef std::map<std::string, std::string> sseckeymap_t;
typedef std::list<sseckeymap_t> sseckeylist_t;
@ -284,6 +285,7 @@ class S3fsCurl
Semaphore *sem;
pthread_mutex_t *completed_tids_lock;
std::vector<pthread_t> *completed_tids;
s3fscurl_razy_setup fpRazySetup; // curl options for razy setting function
public:
// constructor/destructor
@ -316,6 +318,12 @@ class S3fsCurl
static S3fsCurl* CopyMultipartPostRetryCallback(S3fsCurl* s3fscurl);
static S3fsCurl* ParallelGetObjectRetryCallback(S3fsCurl* s3fscurl);
// razy functions for set curl options
static bool UploadMultipartPostSetCurlOpts(S3fsCurl* s3fscurl);
static bool CopyMultipartPostSetCurlOpts(S3fsCurl* s3fscurl);
static bool PreGetObjectRequestSetCurlOpts(S3fsCurl* s3fscurl);
static bool PreHeadRequestSetCurlOpts(S3fsCurl* s3fscurl);
static bool ParseIAMCredentialResponse(const char* response, iamcredmap_t& keyval);
static bool SetIAMCredentials(const char* response);
static bool ParseIAMRoleFromMetaDataResponse(const char* response, std::string& rolename);
@ -418,12 +426,12 @@ class S3fsCurl
static void InitUserAgent(void);
// methods
bool CreateCurlHandle(bool force = false);
bool DestroyCurlHandle(bool force = false);
bool CreateCurlHandle(bool only_pool = false, bool remake = false);
bool DestroyCurlHandle(bool restore_pool = true, bool clear_internal_data = true);
bool LoadIAMRoleFromMetaData(void);
bool AddSseRequestHead(sse_type_t ssetype, std::string& ssevalue, bool is_only_c, bool is_copy);
bool GetResponseCode(long& responseCode);
bool GetResponseCode(long& responseCode, bool from_curl_handle = true);
int RequestPerform(void);
int DeleteRequest(const char* tpath);
bool PreHeadRequest(const char* tpath, const char* bpath = NULL, const char* savedpath = NULL, int ssekey_pos = -1);
@ -473,7 +481,7 @@ class S3fsCurl
//----------------------------------------------
// Class for lapping multi curl
//
typedef std::map<CURL*, S3fsCurl*> s3fscurlmap_t;
typedef std::vector<S3fsCurl*> s3fscurllist_t;
typedef bool (*S3fsMultiSuccessCallback)(S3fsCurl* s3fscurl); // callback for succeed multi request
typedef S3fsCurl* (*S3fsMultiRetryCallback)(S3fsCurl* s3fscurl); // callback for failure and retrying
@ -482,8 +490,8 @@ class S3fsMultiCurl
private:
const int maxParallelism;
s3fscurlmap_t cMap_all; // all of curl requests
s3fscurlmap_t cMap_req; // curl requests are sent
s3fscurllist_t clist_all; // all of curl requests
s3fscurllist_t clist_req; // curl requests are sent
S3fsMultiSuccessCallback SuccessCallback;
S3fsMultiRetryCallback RetryCallback;