Improvement of curl session pool for multipart

Improvement of curl session pool for multipart(2)

Improvement of curl session pool for multipart(3)
This commit is contained in:
Takeshi Nakatani 2019-02-25 12:47:10 +00:00
parent 77993e607e
commit 10d9f75366
2 changed files with 268 additions and 183 deletions

View File

@ -238,7 +238,6 @@ const char* BodyData::str() const
//------------------------------------------------------------------- //-------------------------------------------------------------------
// Class CurlHandlerPool // Class CurlHandlerPool
//------------------------------------------------------------------- //-------------------------------------------------------------------
bool CurlHandlerPool::Init() bool CurlHandlerPool::Init()
{ {
if (0 != pthread_mutex_init(&mLock, NULL)) { if (0 != pthread_mutex_init(&mLock, NULL)) {
@ -246,14 +245,14 @@ bool CurlHandlerPool::Init()
return false; return false;
} }
mHandlers = new CURL*[mMaxHandlers](); // this will init the array to 0 for(int cnt = 0; cnt < mMaxHandlers; ++cnt){
for (int i = 0; i < mMaxHandlers; ++i, ++mIndex) { CURL* hCurl = curl_easy_init();
mHandlers[i] = curl_easy_init(); if(!hCurl){
if (!mHandlers[i]) {
S3FS_PRN_ERR("Init curl handlers pool failed"); S3FS_PRN_ERR("Init curl handlers pool failed");
Destroy(); Destroy();
return false; return false;
} }
mPool.push_back(hCurl);
} }
return true; return true;
@ -261,13 +260,13 @@ bool CurlHandlerPool::Init()
bool CurlHandlerPool::Destroy() bool CurlHandlerPool::Destroy()
{ {
assert(mIndex >= -1 && mIndex < mMaxHandlers); while(!mPool.empty()){
CURL* hCurl = mPool.back();
for (int i = 0; i <= mIndex; ++i) { mPool.pop_back();
curl_easy_cleanup(mHandlers[i]); if(hCurl){
curl_easy_cleanup(hCurl);
}
} }
delete[] mHandlers;
if (0 != pthread_mutex_destroy(&mLock)) { if (0 != pthread_mutex_destroy(&mLock)) {
S3FS_PRN_ERR("Destroy curl handlers lock failed"); S3FS_PRN_ERR("Destroy curl handlers lock failed");
return false; return false;
@ -276,47 +275,51 @@ bool CurlHandlerPool::Destroy()
return true; return true;
} }
CURL* CurlHandlerPool::GetHandler() CURL* CurlHandlerPool::GetHandler(bool only_pool)
{ {
CURL* h = NULL; CURL* hCurl = NULL;
assert(mIndex >= -1 && mIndex < mMaxHandlers);
{ {
AutoLock lock(&mLock); AutoLock lock(&mLock);
if (mIndex >= 0) {
S3FS_PRN_DBG("Get handler from pool: %d", mIndex); if(!mPool.empty()){
h = mHandlers[mIndex--]; hCurl = mPool.back();
mPool.pop_back();
S3FS_PRN_DBG("Get handler from pool: rest = %d", static_cast<int>(mPool.size()));
} }
} }
if(only_pool){
if (!h) { return hCurl;
S3FS_PRN_INFO("Pool empty: create new handler");
h = curl_easy_init();
} }
if(!hCurl){
return h; S3FS_PRN_INFO("Pool empty: force to create new handler");
hCurl = curl_easy_init();
}
return hCurl;
} }
void CurlHandlerPool::ReturnHandler(CURL* h) void CurlHandlerPool::ReturnHandler(CURL* hCurl, bool restore_pool)
{ {
bool needCleanup = true; if(!hCurl){
return;
assert(mIndex >= -1 && mIndex < mMaxHandlers);
{
AutoLock lock(&mLock);
if (mIndex < mMaxHandlers - 1) {
mHandlers[++mIndex] = h;
curl_easy_reset(h);
needCleanup = false;
S3FS_PRN_DBG("Return handler to pool: %d", mIndex);
}
} }
if (needCleanup) { if(restore_pool){
AutoLock lock(&mLock);
S3FS_PRN_DBG("Return handler to pool");
mPool.push_back(hCurl);
while(mMaxHandlers <= static_cast<int>(mPool.size())){
CURL* hOldCurl = mPool.front();
mPool.pop_front();
if(hOldCurl){
S3FS_PRN_INFO("Pool full: destroy the oldest handler");
curl_easy_cleanup(hOldCurl);
}
}
}else{
S3FS_PRN_INFO("Pool full: destroy the handler"); S3FS_PRN_INFO("Pool full: destroy the handler");
curl_easy_cleanup(h); curl_easy_cleanup(hCurl);
} }
} }
@ -409,6 +412,12 @@ bool S3fsCurl::InitS3fsCurl(const char* MimeFile)
if(!S3fsCurl::InitCryptMutex()){ if(!S3fsCurl::InitCryptMutex()){
return false; return false;
} }
// [NOTE]
// sCurlPoolSize must be over parrallel(or multireq) count.
//
if(sCurlPoolSize < std::max(GetMaxParallelCount(), GetMaxMultiRequest())){
sCurlPoolSize = std::max(GetMaxParallelCount(), GetMaxMultiRequest());
}
sCurlPool = new CurlHandlerPool(sCurlPoolSize); sCurlPool = new CurlHandlerPool(sCurlPoolSize);
if (!sCurlPool->Init()) { if (!sCurlPool->Init()) {
return false; return false;
@ -1528,6 +1537,91 @@ int S3fsCurl::ParallelGetObjectRequest(const char* tpath, int fd, off_t start, s
return result; return result;
} }
bool S3fsCurl::UploadMultipartPostSetCurlOpts(S3fsCurl* s3fscurl)
{
if(!s3fscurl){
return false;
}
if(!s3fscurl->CreateCurlHandle()){
return false;
}
curl_easy_setopt(s3fscurl->hCurl, CURLOPT_URL, s3fscurl->url.c_str());
curl_easy_setopt(s3fscurl->hCurl, CURLOPT_UPLOAD, true); // HTTP PUT
curl_easy_setopt(s3fscurl->hCurl, CURLOPT_WRITEDATA, (void*)(s3fscurl->bodydata));
curl_easy_setopt(s3fscurl->hCurl, CURLOPT_WRITEFUNCTION, WriteMemoryCallback);
curl_easy_setopt(s3fscurl->hCurl, CURLOPT_HEADERDATA, (void*)&(s3fscurl->responseHeaders));
curl_easy_setopt(s3fscurl->hCurl, CURLOPT_HEADERFUNCTION, HeaderCallback);
curl_easy_setopt(s3fscurl->hCurl, CURLOPT_INFILESIZE_LARGE, static_cast<curl_off_t>(s3fscurl->partdata.size)); // Content-Length
curl_easy_setopt(s3fscurl->hCurl, CURLOPT_READFUNCTION, UploadReadCallback);
curl_easy_setopt(s3fscurl->hCurl, CURLOPT_READDATA, (void*)s3fscurl);
curl_easy_setopt(s3fscurl->hCurl, CURLOPT_HTTPHEADER, s3fscurl->requestHeaders);
S3fsCurl::AddUserAgent(s3fscurl->hCurl); // put User-Agent
return true;
}
bool S3fsCurl::CopyMultipartPostSetCurlOpts(S3fsCurl* s3fscurl)
{
if(!s3fscurl){
return false;
}
if(!s3fscurl->CreateCurlHandle()){
return false;
}
curl_easy_setopt(s3fscurl->hCurl, CURLOPT_URL, s3fscurl->url.c_str());
curl_easy_setopt(s3fscurl->hCurl, CURLOPT_UPLOAD, true); // HTTP PUT
curl_easy_setopt(s3fscurl->hCurl, CURLOPT_WRITEDATA, (void*)(s3fscurl->bodydata));
curl_easy_setopt(s3fscurl->hCurl, CURLOPT_WRITEFUNCTION, WriteMemoryCallback);
curl_easy_setopt(s3fscurl->hCurl, CURLOPT_HEADERDATA, (void*)(s3fscurl->headdata));
curl_easy_setopt(s3fscurl->hCurl, CURLOPT_HEADERFUNCTION, WriteMemoryCallback);
curl_easy_setopt(s3fscurl->hCurl, CURLOPT_INFILESIZE, 0); // Content-Length
curl_easy_setopt(s3fscurl->hCurl, CURLOPT_HTTPHEADER, s3fscurl->requestHeaders);
S3fsCurl::AddUserAgent(s3fscurl->hCurl); // put User-Agent
return true;
}
bool S3fsCurl::PreGetObjectRequestSetCurlOpts(S3fsCurl* s3fscurl)
{
if(!s3fscurl){
return false;
}
if(!s3fscurl->CreateCurlHandle()){
return false;
}
curl_easy_setopt(s3fscurl->hCurl, CURLOPT_URL, s3fscurl->url.c_str());
curl_easy_setopt(s3fscurl->hCurl, CURLOPT_HTTPHEADER, s3fscurl->requestHeaders);
curl_easy_setopt(s3fscurl->hCurl, CURLOPT_WRITEFUNCTION, DownloadWriteCallback);
curl_easy_setopt(s3fscurl->hCurl, CURLOPT_WRITEDATA, (void*)s3fscurl);
S3fsCurl::AddUserAgent(s3fscurl->hCurl); // put User-Agent
return true;
}
bool S3fsCurl::PreHeadRequestSetCurlOpts(S3fsCurl* s3fscurl)
{
if(!s3fscurl){
return false;
}
if(!s3fscurl->CreateCurlHandle()){
return false;
}
curl_easy_setopt(s3fscurl->hCurl, CURLOPT_URL, s3fscurl->url.c_str());
curl_easy_setopt(s3fscurl->hCurl, CURLOPT_NOBODY, true); // HEAD
curl_easy_setopt(s3fscurl->hCurl, CURLOPT_FILETIME, true); // Last-Modified
curl_easy_setopt(s3fscurl->hCurl, CURLOPT_HTTPHEADER, s3fscurl->requestHeaders);
// responseHeaders
curl_easy_setopt(s3fscurl->hCurl, CURLOPT_HEADERDATA, (void*)&(s3fscurl->responseHeaders));
curl_easy_setopt(s3fscurl->hCurl, CURLOPT_HEADERFUNCTION, HeaderCallback);
S3fsCurl::AddUserAgent(s3fscurl->hCurl); // put User-Agent
return true;
}
bool S3fsCurl::ParseIAMCredentialResponse(const char* response, iamcredmap_t& keyval) bool S3fsCurl::ParseIAMCredentialResponse(const char* response, iamcredmap_t& keyval)
{ {
if(!response){ if(!response){
@ -1735,7 +1829,7 @@ S3fsCurl::S3fsCurl(bool ahbe) :
bodydata(NULL), headdata(NULL), LastResponseCode(-1), postdata(NULL), postdata_remaining(0), is_use_ahbe(ahbe), bodydata(NULL), headdata(NULL), LastResponseCode(-1), postdata(NULL), postdata_remaining(0), is_use_ahbe(ahbe),
retry_count(0), b_infile(NULL), b_postdata(NULL), b_postdata_remaining(0), b_partdata_startpos(0), b_partdata_size(0), retry_count(0), b_infile(NULL), b_postdata(NULL), b_postdata_remaining(0), b_partdata_startpos(0), b_partdata_size(0),
b_ssekey_pos(-1), b_ssevalue(""), b_ssetype(SSE_DISABLE), op(""), query_string(""), b_ssekey_pos(-1), b_ssevalue(""), b_ssetype(SSE_DISABLE), op(""), query_string(""),
sem(NULL), completed_tids_lock(NULL), completed_tids(NULL) sem(NULL), completed_tids_lock(NULL), completed_tids(NULL), fpRazySetup(NULL)
{ {
} }
@ -1754,6 +1848,9 @@ bool S3fsCurl::ResetHandle()
curl_easy_setopt(hCurl, CURLOPT_PROGRESSFUNCTION, S3fsCurl::CurlProgress); curl_easy_setopt(hCurl, CURLOPT_PROGRESSFUNCTION, S3fsCurl::CurlProgress);
curl_easy_setopt(hCurl, CURLOPT_PROGRESSDATA, hCurl); curl_easy_setopt(hCurl, CURLOPT_PROGRESSDATA, hCurl);
// curl_easy_setopt(hCurl, CURLOPT_FORBID_REUSE, 1); // curl_easy_setopt(hCurl, CURLOPT_FORBID_REUSE, 1);
curl_easy_setopt(hCurl, CURLOPT_TCP_KEEPALIVE, 1);
// curl_easy_setopt(hCurl, CURLOPT_KEEP_SENDING_ON_ERROR, 1); // after 7.51.0
// curl_easy_setopt(hCurl, CURLOPT_SSL_ENABLE_ALPN, 0); // after 7.36.0 for disable ALPN for s3 server
if(type != REQTYPE_IAMCRED && type != REQTYPE_IAMROLE){ if(type != REQTYPE_IAMCRED && type != REQTYPE_IAMROLE){
// REQTYPE_IAMCRED and REQTYPE_IAMROLE are always HTTP // REQTYPE_IAMCRED and REQTYPE_IAMROLE are always HTTP
@ -1788,25 +1885,30 @@ bool S3fsCurl::ResetHandle()
return true; return true;
} }
bool S3fsCurl::CreateCurlHandle(bool force) bool S3fsCurl::CreateCurlHandle(bool only_pool, bool remake)
{ {
AutoLock lock(&S3fsCurl::curl_handles_lock); AutoLock lock(&S3fsCurl::curl_handles_lock);
if(hCurl){ if(hCurl && remake){
if(!force){ if(!DestroyCurlHandle(false)){
S3FS_PRN_WARN("already create handle.");
return false;
}
if(!DestroyCurlHandle(true)){
S3FS_PRN_ERR("could not destroy handle."); S3FS_PRN_ERR("could not destroy handle.");
return false; return false;
} }
S3FS_PRN_INFO3("already has handle, so destroyed it."); S3FS_PRN_INFO3("already has handle, so destroyed it or restored it to pool.");
} }
if(NULL == (hCurl = sCurlPool->GetHandler())){ if(!hCurl){
S3FS_PRN_ERR("Failed to create handle."); if(NULL == (hCurl = sCurlPool->GetHandler(only_pool))){
return false; if(!only_pool){
S3FS_PRN_ERR("Failed to create handle.");
return false;
}else{
// [NOTE]
// urther initialization processing is left to razy processing to be executed later.
// (Currently we do not use only_pool=true, but this code is remained for the future)
return true;
}
}
} }
// [NOTE] // [NOTE]
@ -1823,20 +1925,18 @@ bool S3fsCurl::CreateCurlHandle(bool force)
return true; return true;
} }
bool S3fsCurl::DestroyCurlHandle(bool force) bool S3fsCurl::DestroyCurlHandle(bool restore_pool, bool clear_internal_data)
{ {
ClearInternalData(); if(clear_internal_data){
ClearInternalData();
}
if(hCurl){ if(hCurl){
AutoLock lock(&S3fsCurl::curl_handles_lock); AutoLock lock(&S3fsCurl::curl_handles_lock);
S3fsCurl::curl_times.erase(hCurl); S3fsCurl::curl_times.erase(hCurl);
S3fsCurl::curl_progress.erase(hCurl); S3fsCurl::curl_progress.erase(hCurl);
if(retry_count == 0 || force){ sCurlPool->ReturnHandler(hCurl, restore_pool);
sCurlPool->ReturnHandler(hCurl);
}else{
curl_easy_cleanup(hCurl);
}
hCurl = NULL; hCurl = NULL;
}else{ }else{
return false; return false;
@ -1879,6 +1979,8 @@ bool S3fsCurl::ClearInternalData()
b_partdata_size = 0; b_partdata_size = 0;
partdata.clear(); partdata.clear();
fpRazySetup = NULL;
S3FS_MALLOCTRIM(0); S3FS_MALLOCTRIM(0);
return true; return true;
@ -1891,16 +1993,21 @@ bool S3fsCurl::SetUseAhbe(bool ahbe)
return old; return old;
} }
bool S3fsCurl::GetResponseCode(long& responseCode) bool S3fsCurl::GetResponseCode(long& responseCode, bool from_curl_handle)
{ {
if(!hCurl){
return false;
}
responseCode = -1; responseCode = -1;
if(CURLE_OK != curl_easy_getinfo(hCurl, CURLINFO_RESPONSE_CODE, &LastResponseCode)){
return false; if(!from_curl_handle){
responseCode = LastResponseCode;
}else{
if(!hCurl){
return false;
}
if(CURLE_OK != curl_easy_getinfo(hCurl, CURLINFO_RESPONSE_CODE, &LastResponseCode)){
return false;
}
responseCode = LastResponseCode;
} }
responseCode = LastResponseCode;
return true; return true;
} }
@ -1945,12 +2052,7 @@ bool S3fsCurl::RemakeHandle()
partdata.size = b_partdata_size; partdata.size = b_partdata_size;
// reset handle // reset handle
curl_easy_cleanup(hCurl);
hCurl = curl_easy_init();
ResetHandle(); ResetHandle();
// disable ssl cache, so that a new session will be created
curl_easy_setopt(hCurl, CURLOPT_SSL_SESSIONID_CACHE, 0);
curl_easy_setopt(hCurl, CURLOPT_SHARE, NULL);
// set options // set options
switch(type){ switch(type){
@ -2558,7 +2660,7 @@ int S3fsCurl::DeleteRequest(const char* tpath)
if(!tpath){ if(!tpath){
return -1; return -1;
} }
if(!CreateCurlHandle(true)){ if(!CreateCurlHandle()){
return -1; return -1;
} }
string resource; string resource;
@ -2600,7 +2702,7 @@ int S3fsCurl::GetIAMCredentials()
// at first set type for handle // at first set type for handle
type = REQTYPE_IAMCRED; type = REQTYPE_IAMCRED;
if(!CreateCurlHandle(true)){ if(!CreateCurlHandle()){
return -EIO; return -EIO;
} }
@ -2668,7 +2770,7 @@ bool S3fsCurl::LoadIAMRoleFromMetaData()
// at first set type for handle // at first set type for handle
type = REQTYPE_IAMROLE; type = REQTYPE_IAMROLE;
if(!CreateCurlHandle(true)){ if(!CreateCurlHandle()){
return false; return false;
} }
@ -2744,9 +2846,6 @@ bool S3fsCurl::PreHeadRequest(const char* tpath, const char* bpath, const char*
if(!tpath){ if(!tpath){
return false; return false;
} }
if(!CreateCurlHandle(true)){
return false;
}
string resource; string resource;
string turl; string turl;
MakeUrlResource(get_realpath(tpath).c_str(), resource, turl); MakeUrlResource(get_realpath(tpath).c_str(), resource, turl);
@ -2773,15 +2872,8 @@ bool S3fsCurl::PreHeadRequest(const char* tpath, const char* bpath, const char*
type = REQTYPE_HEAD; type = REQTYPE_HEAD;
insertAuthHeaders(); insertAuthHeaders();
curl_easy_setopt(hCurl, CURLOPT_URL, url.c_str()); // set razy function
curl_easy_setopt(hCurl, CURLOPT_NOBODY, true); // HEAD fpRazySetup = PreHeadRequestSetCurlOpts;
curl_easy_setopt(hCurl, CURLOPT_FILETIME, true); // Last-Modified
curl_easy_setopt(hCurl, CURLOPT_HTTPHEADER, requestHeaders);
// responseHeaders
curl_easy_setopt(hCurl, CURLOPT_HEADERDATA, (void*)&responseHeaders);
curl_easy_setopt(hCurl, CURLOPT_HEADERFUNCTION, HeaderCallback);
S3fsCurl::AddUserAgent(hCurl); // put User-Agent
return true; return true;
} }
@ -2793,7 +2885,7 @@ int S3fsCurl::HeadRequest(const char* tpath, headers_t& meta)
S3FS_PRN_INFO3("[tpath=%s]", SAFESTRPTR(tpath)); S3FS_PRN_INFO3("[tpath=%s]", SAFESTRPTR(tpath));
// At first, try to get without SSE-C headers // At first, try to get without SSE-C headers
if(!PreHeadRequest(tpath) || 0 != (result = RequestPerform())){ if(!PreHeadRequest(tpath) || !fpRazySetup || !fpRazySetup(this) || 0 != (result = RequestPerform())){
// If has SSE-C keys, try to get with all SSE-C keys. // If has SSE-C keys, try to get with all SSE-C keys.
for(int pos = 0; static_cast<size_t>(pos) < S3fsCurl::sseckeys.size(); pos++){ for(int pos = 0; static_cast<size_t>(pos) < S3fsCurl::sseckeys.size(); pos++){
if(!DestroyCurlHandle()){ if(!DestroyCurlHandle()){
@ -2802,6 +2894,10 @@ int S3fsCurl::HeadRequest(const char* tpath, headers_t& meta)
if(!PreHeadRequest(tpath, NULL, NULL, pos)){ if(!PreHeadRequest(tpath, NULL, NULL, pos)){
break; break;
} }
if(!fpRazySetup || !fpRazySetup(this)){
S3FS_PRN_ERR("Failed to razy setup in single head request.");
break;
}
if(0 == (result = RequestPerform())){ if(0 == (result = RequestPerform())){
break; break;
} }
@ -2840,7 +2936,7 @@ int S3fsCurl::PutHeadRequest(const char* tpath, headers_t& meta, bool is_copy)
if(!tpath){ if(!tpath){
return -1; return -1;
} }
if(!CreateCurlHandle(true)){ if(!CreateCurlHandle()){
return -1; return -1;
} }
string resource; string resource;
@ -2976,7 +3072,7 @@ int S3fsCurl::PutRequest(const char* tpath, headers_t& meta, int fd)
S3FS_PRN_INFO3("create zero byte file object."); S3FS_PRN_INFO3("create zero byte file object.");
} }
if(!CreateCurlHandle(true)){ if(!CreateCurlHandle()){
if(file){ if(file){
fclose(file); fclose(file);
} }
@ -3075,9 +3171,6 @@ int S3fsCurl::PreGetObjectRequest(const char* tpath, int fd, off_t start, ssize_
return -1; return -1;
} }
if(!CreateCurlHandle(true)){
return -1;
}
string resource; string resource;
string turl; string turl;
MakeUrlResource(get_realpath(tpath).c_str(), resource, turl); MakeUrlResource(get_realpath(tpath).c_str(), resource, turl);
@ -3103,12 +3196,8 @@ int S3fsCurl::PreGetObjectRequest(const char* tpath, int fd, off_t start, ssize_
type = REQTYPE_GET; type = REQTYPE_GET;
insertAuthHeaders(); insertAuthHeaders();
// setopt // set razy function
curl_easy_setopt(hCurl, CURLOPT_URL, url.c_str()); fpRazySetup = PreGetObjectRequestSetCurlOpts;
curl_easy_setopt(hCurl, CURLOPT_HTTPHEADER, requestHeaders);
curl_easy_setopt(hCurl, CURLOPT_WRITEFUNCTION, S3fsCurl::DownloadWriteCallback);
curl_easy_setopt(hCurl, CURLOPT_WRITEDATA, (void*)this);
S3fsCurl::AddUserAgent(hCurl); // put User-Agent
// set info for callback func. // set info for callback func.
// (use only fd, startpos and size, other member is not used.) // (use only fd, startpos and size, other member is not used.)
@ -3143,6 +3232,10 @@ int S3fsCurl::GetObjectRequest(const char* tpath, int fd, off_t start, ssize_t s
if(0 != (result = PreGetObjectRequest(tpath, fd, start, size, ssetype, ssevalue))){ if(0 != (result = PreGetObjectRequest(tpath, fd, start, size, ssetype, ssevalue))){
return result; return result;
} }
if(!fpRazySetup || !fpRazySetup(this)){
S3FS_PRN_ERR("Failed to razy setup in single get object request.");
return -1;
}
S3FS_PRN_INFO3("downloading... [path=%s][fd=%d]", tpath, fd); S3FS_PRN_INFO3("downloading... [path=%s][fd=%d]", tpath, fd);
@ -3156,7 +3249,7 @@ int S3fsCurl::CheckBucket()
{ {
S3FS_PRN_INFO3("check a bucket."); S3FS_PRN_INFO3("check a bucket.");
if(!CreateCurlHandle(true)){ if(!CreateCurlHandle()){
return -1; return -1;
} }
string resource; string resource;
@ -3194,7 +3287,7 @@ int S3fsCurl::ListBucketRequest(const char* tpath, const char* query)
if(!tpath){ if(!tpath){
return -1; return -1;
} }
if(!CreateCurlHandle(true)){ if(!CreateCurlHandle()){
return -1; return -1;
} }
string resource; string resource;
@ -3242,7 +3335,7 @@ int S3fsCurl::PreMultipartPostRequest(const char* tpath, headers_t& meta, string
if(!tpath){ if(!tpath){
return -1; return -1;
} }
if(!CreateCurlHandle(true)){ if(!CreateCurlHandle()){
return -1; return -1;
} }
string resource; string resource;
@ -3374,7 +3467,7 @@ int S3fsCurl::CompleteMultipartPostRequest(const char* tpath, string& upload_id,
postdata_remaining = postContent.size(); // without null postdata_remaining = postContent.size(); // without null
b_postdata_remaining = postdata_remaining; b_postdata_remaining = postdata_remaining;
if(!CreateCurlHandle(true)){ if(!CreateCurlHandle()){
return -1; return -1;
} }
string resource; string resource;
@ -3421,7 +3514,7 @@ int S3fsCurl::MultipartListRequest(string& body)
{ {
S3FS_PRN_INFO3("list request(multipart)"); S3FS_PRN_INFO3("list request(multipart)");
if(!CreateCurlHandle(true)){ if(!CreateCurlHandle()){
return -1; return -1;
} }
string resource; string resource;
@ -3468,7 +3561,7 @@ int S3fsCurl::AbortMultipartUpload(const char* tpath, string& upload_id)
if(!tpath){ if(!tpath){
return -1; return -1;
} }
if(!CreateCurlHandle(true)){ if(!CreateCurlHandle()){
return -1; return -1;
} }
string resource; string resource;
@ -3533,11 +3626,6 @@ int S3fsCurl::UploadMultipartPostSetup(const char* tpath, int part_num, const st
free(md5raw); free(md5raw);
} }
// create handle
if(!CreateCurlHandle(true)){
return -1;
}
// make request // make request
query_string = "partNumber=" + str(part_num) + "&uploadId=" + upload_id; query_string = "partNumber=" + str(part_num) + "&uploadId=" + upload_id;
string urlargs = "?" + query_string; string urlargs = "?" + query_string;
@ -3566,18 +3654,8 @@ int S3fsCurl::UploadMultipartPostSetup(const char* tpath, int part_num, const st
type = REQTYPE_UPLOADMULTIPOST; type = REQTYPE_UPLOADMULTIPOST;
insertAuthHeaders(); insertAuthHeaders();
// setopt // set razy function
curl_easy_setopt(hCurl, CURLOPT_URL, url.c_str()); fpRazySetup = UploadMultipartPostSetCurlOpts;
curl_easy_setopt(hCurl, CURLOPT_UPLOAD, true); // HTTP PUT
curl_easy_setopt(hCurl, CURLOPT_WRITEDATA, (void*)bodydata);
curl_easy_setopt(hCurl, CURLOPT_WRITEFUNCTION, WriteMemoryCallback);
curl_easy_setopt(hCurl, CURLOPT_HEADERDATA, (void*)&responseHeaders);
curl_easy_setopt(hCurl, CURLOPT_HEADERFUNCTION, HeaderCallback);
curl_easy_setopt(hCurl, CURLOPT_INFILESIZE_LARGE, static_cast<curl_off_t>(partdata.size)); // Content-Length
curl_easy_setopt(hCurl, CURLOPT_READFUNCTION, S3fsCurl::UploadReadCallback);
curl_easy_setopt(hCurl, CURLOPT_READDATA, (void*)this);
curl_easy_setopt(hCurl, CURLOPT_HTTPHEADER, requestHeaders);
S3fsCurl::AddUserAgent(hCurl); // put User-Agent
return 0; return 0;
} }
@ -3615,9 +3693,6 @@ int S3fsCurl::CopyMultipartPostSetup(const char* from, const char* to, int part_
if(!from || !to){ if(!from || !to){
return -1; return -1;
} }
if(!CreateCurlHandle(true)){
return -1;
}
query_string = "partNumber=" + str(part_num) + "&uploadId=" + upload_id; query_string = "partNumber=" + str(part_num) + "&uploadId=" + upload_id;
string urlargs = "?" + query_string; string urlargs = "?" + query_string;
string resource; string resource;
@ -3650,16 +3725,8 @@ int S3fsCurl::CopyMultipartPostSetup(const char* from, const char* to, int part_
type = REQTYPE_COPYMULTIPOST; type = REQTYPE_COPYMULTIPOST;
insertAuthHeaders(); insertAuthHeaders();
// setopt // set razy function
curl_easy_setopt(hCurl, CURLOPT_URL, url.c_str()); fpRazySetup = CopyMultipartPostSetCurlOpts;
curl_easy_setopt(hCurl, CURLOPT_UPLOAD, true); // HTTP PUT
curl_easy_setopt(hCurl, CURLOPT_WRITEDATA, (void*)bodydata);
curl_easy_setopt(hCurl, CURLOPT_WRITEFUNCTION, WriteMemoryCallback);
curl_easy_setopt(hCurl, CURLOPT_HEADERDATA, (void*)headdata);
curl_easy_setopt(hCurl, CURLOPT_HEADERFUNCTION, WriteMemoryCallback);
curl_easy_setopt(hCurl, CURLOPT_INFILESIZE, 0); // Content-Length
curl_easy_setopt(hCurl, CURLOPT_HTTPHEADER, requestHeaders);
S3fsCurl::AddUserAgent(hCurl); // put User-Agent
// request // request
S3FS_PRN_INFO3("copying... [from=%s][to=%s][part=%d]", from, to, part_num); S3FS_PRN_INFO3("copying... [from=%s][to=%s][part=%d]", from, to, part_num);
@ -3972,23 +4039,23 @@ S3fsMultiCurl::~S3fsMultiCurl()
bool S3fsMultiCurl::ClearEx(bool is_all) bool S3fsMultiCurl::ClearEx(bool is_all)
{ {
s3fscurlmap_t::iterator iter; s3fscurllist_t::iterator iter;
for(iter = cMap_req.begin(); iter != cMap_req.end(); ++iter){ for(iter = clist_req.begin(); iter != clist_req.end(); ++iter){
S3fsCurl* s3fscurl = (*iter).second; S3fsCurl* s3fscurl = *iter;
if(s3fscurl){ if(s3fscurl){
s3fscurl->DestroyCurlHandle(); s3fscurl->DestroyCurlHandle();
delete s3fscurl; // with destroy curl handle. delete s3fscurl; // with destroy curl handle.
} }
} }
cMap_req.clear(); clist_req.clear();
if(is_all){ if(is_all){
for(iter = cMap_all.begin(); iter != cMap_all.end(); ++iter){ for(iter = clist_all.begin(); iter != clist_all.end(); ++iter){
S3fsCurl* s3fscurl = (*iter).second; S3fsCurl* s3fscurl = *iter;
s3fscurl->DestroyCurlHandle(); s3fscurl->DestroyCurlHandle();
delete s3fscurl; delete s3fscurl;
} }
cMap_all.clear(); clist_all.clear();
} }
S3FS_MALLOCTRIM(0); S3FS_MALLOCTRIM(0);
@ -4015,10 +4082,8 @@ bool S3fsMultiCurl::SetS3fsCurlObject(S3fsCurl* s3fscurl)
if(!s3fscurl){ if(!s3fscurl){
return false; return false;
} }
if(cMap_all.end() != cMap_all.find(s3fscurl->hCurl)){ clist_all.push_back(s3fscurl);
return false;
}
cMap_all[s3fscurl->hCurl] = s3fscurl;
return true; return true;
} }
@ -4030,9 +4095,9 @@ int S3fsMultiCurl::MultiPerform()
Semaphore sem(GetMaxParallelism()); Semaphore sem(GetMaxParallelism());
int rc; int rc;
for(s3fscurlmap_t::iterator iter = cMap_req.begin(); iter != cMap_req.end(); ++iter) { for(s3fscurllist_t::iterator iter = clist_req.begin(); iter != clist_req.end(); ++iter) {
pthread_t thread; pthread_t thread;
S3fsCurl* s3fscurl = (*iter).second; S3fsCurl* s3fscurl = *iter;
s3fscurl->sem = &sem; s3fscurl->sem = &sem;
s3fscurl->completed_tids_lock = &completed_tids_lock; s3fscurl->completed_tids_lock = &completed_tids_lock;
s3fscurl->completed_tids = &completed_tids; s3fscurl->completed_tids = &completed_tids;
@ -4096,13 +4161,13 @@ int S3fsMultiCurl::MultiPerform()
int S3fsMultiCurl::MultiRead() int S3fsMultiCurl::MultiRead()
{ {
for(s3fscurlmap_t::iterator iter = cMap_req.begin(); iter != cMap_req.end(); ++iter) { for(s3fscurllist_t::iterator iter = clist_req.begin(); iter != clist_req.end(); ++iter) {
S3fsCurl* s3fscurl = (*iter).second; S3fsCurl* s3fscurl = *iter;
bool isRetry = false; bool isRetry = false;
long responseCode = -1; long responseCode = -1;
if(s3fscurl->GetResponseCode(responseCode)){ if(s3fscurl->GetResponseCode(responseCode, false)){
if(400 > responseCode){ if(400 > responseCode){
// add into stat cache // add into stat cache
if(SuccessCallback && !SuccessCallback(s3fscurl)){ if(SuccessCallback && !SuccessCallback(s3fscurl)){
@ -4133,7 +4198,6 @@ int S3fsMultiCurl::MultiRead()
S3FS_PRN_ERR("failed a request(Unknown response code: %s)", s3fscurl->url.c_str()); S3FS_PRN_ERR("failed a request(Unknown response code: %s)", s3fscurl->url.c_str());
} }
if(!isRetry){ if(!isRetry){
s3fscurl->DestroyCurlHandle(); s3fscurl->DestroyCurlHandle();
delete s3fscurl; delete s3fscurl;
@ -4145,7 +4209,7 @@ int S3fsMultiCurl::MultiRead()
if(RetryCallback){ if(RetryCallback){
retrycurl = RetryCallback(s3fscurl); retrycurl = RetryCallback(s3fscurl);
if(NULL != retrycurl){ if(NULL != retrycurl){
cMap_all[retrycurl->hCurl] = retrycurl; clist_all.push_back(retrycurl);
}else{ }else{
// Could not set up callback. // Could not set up callback.
return -EIO; return -EIO;
@ -4157,31 +4221,29 @@ int S3fsMultiCurl::MultiRead()
} }
} }
} }
cMap_req.clear(); clist_req.clear();
return 0; return 0;
} }
int S3fsMultiCurl::Request() int S3fsMultiCurl::Request()
{ {
S3FS_PRN_INFO3("[count=%zu]", cMap_all.size()); S3FS_PRN_INFO3("[count=%zu]", clist_all.size());
// Make request list. // Make request list.
// //
// Send multi request loop( with retry ) // Send multi request loop( with retry )
// (When many request is sends, sometimes gets "Couldn't connect to server") // (When many request is sends, sometimes gets "Couldn't connect to server")
// //
while(!cMap_all.empty()){ while(!clist_all.empty()){
// set curl handle to multi handle // set curl handle to multi handle
int result; int result;
s3fscurlmap_t::iterator iter; s3fscurllist_t::iterator iter;
for(iter = cMap_all.begin(); iter != cMap_all.end(); ++iter){ for(iter = clist_all.begin(); iter != clist_all.end(); ++iter){
CURL* hCurl = (*iter).first; S3fsCurl* s3fscurl = *iter;
S3fsCurl* s3fscurl = (*iter).second; clist_req.push_back(s3fscurl);
cMap_req[hCurl] = s3fscurl;
} }
cMap_all.clear(); clist_all.clear();
// Send multi request. // Send multi request.
if(0 != (result = MultiPerform())){ if(0 != (result = MultiPerform())){
@ -4202,12 +4264,27 @@ int S3fsMultiCurl::Request()
} }
// thread function for performing an S3fsCurl request // thread function for performing an S3fsCurl request
void* S3fsMultiCurl::RequestPerformWrapper(void* arg) { //
S3fsCurl* s3fscurl = static_cast<S3fsCurl*>(arg); void* S3fsMultiCurl::RequestPerformWrapper(void* arg)
void *result = (void*)(intptr_t)(s3fscurl->RequestPerform()); {
AutoLock lock(s3fscurl->completed_tids_lock); S3fsCurl* s3fscurl= static_cast<S3fsCurl*>(arg);
void* result = NULL;
if(s3fscurl && s3fscurl->fpRazySetup){
if(!s3fscurl->fpRazySetup(s3fscurl)){
S3FS_PRN_ERR("Failed to razy setup, then respond EIO.");
result = (void*)(intptr_t)(-EIO);
}
}
if(!result){
result = (void*)(intptr_t)(s3fscurl->RequestPerform());
s3fscurl->DestroyCurlHandle(true, false);
}
AutoLock lock(s3fscurl->completed_tids_lock);
s3fscurl->completed_tids->push_back(pthread_self()); s3fscurl->completed_tids->push_back(pthread_self());
s3fscurl->sem->post(); s3fscurl->sem->post();
return result; return result;
} }

View File

@ -128,14 +128,12 @@ class S3fsMultiCurl;
//---------------------------------------------- //----------------------------------------------
// class CurlHandlerPool // class CurlHandlerPool
//---------------------------------------------- //----------------------------------------------
typedef std::list<CURL*> hcurllist_t;
class CurlHandlerPool class CurlHandlerPool
{ {
public: public:
explicit CurlHandlerPool(int maxHandlers) explicit CurlHandlerPool(int maxHandlers) : mMaxHandlers(maxHandlers)
: mMaxHandlers(maxHandlers)
, mHandlers(NULL)
, mIndex(-1)
{ {
assert(maxHandlers > 0); assert(maxHandlers > 0);
} }
@ -143,20 +141,23 @@ public:
bool Init(); bool Init();
bool Destroy(); bool Destroy();
CURL* GetHandler(); CURL* GetHandler(bool only_pool);
void ReturnHandler(CURL* h); void ReturnHandler(CURL* hCurl, bool restore_pool);
private: private:
int mMaxHandlers; int mMaxHandlers;
pthread_mutex_t mLock; pthread_mutex_t mLock;
CURL** mHandlers; hcurllist_t mPool;
int mIndex;
}; };
//---------------------------------------------- //----------------------------------------------
// class S3fsCurl // class S3fsCurl
//---------------------------------------------- //----------------------------------------------
class S3fsCurl;
// Prototype function for razy setup options for curl handle
typedef bool (*s3fscurl_razy_setup)(S3fsCurl* s3fscurl);
typedef std::map<std::string, std::string> iamcredmap_t; typedef std::map<std::string, std::string> iamcredmap_t;
typedef std::map<std::string, std::string> sseckeymap_t; typedef std::map<std::string, std::string> sseckeymap_t;
typedef std::list<sseckeymap_t> sseckeylist_t; typedef std::list<sseckeymap_t> sseckeylist_t;
@ -284,6 +285,7 @@ class S3fsCurl
Semaphore *sem; Semaphore *sem;
pthread_mutex_t *completed_tids_lock; pthread_mutex_t *completed_tids_lock;
std::vector<pthread_t> *completed_tids; std::vector<pthread_t> *completed_tids;
s3fscurl_razy_setup fpRazySetup; // curl options for razy setting function
public: public:
// constructor/destructor // constructor/destructor
@ -316,6 +318,12 @@ class S3fsCurl
static S3fsCurl* CopyMultipartPostRetryCallback(S3fsCurl* s3fscurl); static S3fsCurl* CopyMultipartPostRetryCallback(S3fsCurl* s3fscurl);
static S3fsCurl* ParallelGetObjectRetryCallback(S3fsCurl* s3fscurl); static S3fsCurl* ParallelGetObjectRetryCallback(S3fsCurl* s3fscurl);
// razy functions for set curl options
static bool UploadMultipartPostSetCurlOpts(S3fsCurl* s3fscurl);
static bool CopyMultipartPostSetCurlOpts(S3fsCurl* s3fscurl);
static bool PreGetObjectRequestSetCurlOpts(S3fsCurl* s3fscurl);
static bool PreHeadRequestSetCurlOpts(S3fsCurl* s3fscurl);
static bool ParseIAMCredentialResponse(const char* response, iamcredmap_t& keyval); static bool ParseIAMCredentialResponse(const char* response, iamcredmap_t& keyval);
static bool SetIAMCredentials(const char* response); static bool SetIAMCredentials(const char* response);
static bool ParseIAMRoleFromMetaDataResponse(const char* response, std::string& rolename); static bool ParseIAMRoleFromMetaDataResponse(const char* response, std::string& rolename);
@ -418,12 +426,12 @@ class S3fsCurl
static void InitUserAgent(void); static void InitUserAgent(void);
// methods // methods
bool CreateCurlHandle(bool force = false); bool CreateCurlHandle(bool only_pool = false, bool remake = false);
bool DestroyCurlHandle(bool force = false); bool DestroyCurlHandle(bool restore_pool = true, bool clear_internal_data = true);
bool LoadIAMRoleFromMetaData(void); bool LoadIAMRoleFromMetaData(void);
bool AddSseRequestHead(sse_type_t ssetype, std::string& ssevalue, bool is_only_c, bool is_copy); bool AddSseRequestHead(sse_type_t ssetype, std::string& ssevalue, bool is_only_c, bool is_copy);
bool GetResponseCode(long& responseCode); bool GetResponseCode(long& responseCode, bool from_curl_handle = true);
int RequestPerform(void); int RequestPerform(void);
int DeleteRequest(const char* tpath); int DeleteRequest(const char* tpath);
bool PreHeadRequest(const char* tpath, const char* bpath = NULL, const char* savedpath = NULL, int ssekey_pos = -1); bool PreHeadRequest(const char* tpath, const char* bpath = NULL, const char* savedpath = NULL, int ssekey_pos = -1);
@ -473,7 +481,7 @@ class S3fsCurl
//---------------------------------------------- //----------------------------------------------
// Class for lapping multi curl // Class for lapping multi curl
// //
typedef std::map<CURL*, S3fsCurl*> s3fscurlmap_t; typedef std::vector<S3fsCurl*> s3fscurllist_t;
typedef bool (*S3fsMultiSuccessCallback)(S3fsCurl* s3fscurl); // callback for succeed multi request typedef bool (*S3fsMultiSuccessCallback)(S3fsCurl* s3fscurl); // callback for succeed multi request
typedef S3fsCurl* (*S3fsMultiRetryCallback)(S3fsCurl* s3fscurl); // callback for failure and retrying typedef S3fsCurl* (*S3fsMultiRetryCallback)(S3fsCurl* s3fscurl); // callback for failure and retrying
@ -482,8 +490,8 @@ class S3fsMultiCurl
private: private:
const int maxParallelism; const int maxParallelism;
s3fscurlmap_t cMap_all; // all of curl requests s3fscurllist_t clist_all; // all of curl requests
s3fscurlmap_t cMap_req; // curl requests are sent s3fscurllist_t clist_req; // curl requests are sent
S3fsMultiSuccessCallback SuccessCallback; S3fsMultiSuccessCallback SuccessCallback;
S3fsMultiRetryCallback RetryCallback; S3fsMultiRetryCallback RetryCallback;