Changed multiple Head requests from S3fsMultiCurl to ThreadPoolMan

This commit is contained in:
Takeshi Nakatani 2024-07-15 06:40:05 +00:00 committed by Andrew Gaul
parent efc23316e9
commit a1e47bc287
11 changed files with 374 additions and 243 deletions

View File

@ -55,6 +55,7 @@ s3fs_SOURCES = \
addhead.cpp \
sighandlers.cpp \
threadpoolman.cpp \
syncfiller.cpp \
common_auth.cpp
if USE_SSL_OPENSSL
s3fs_SOURCES += openssl_auth.cpp

View File

@ -709,6 +709,11 @@ int S3fsCurl::SetRetries(int count)
return old;
}
int S3fsCurl::GetRetries()
{
return S3fsCurl::retries;
}
bool S3fsCurl::SetPublicBucket(bool flag)
{
bool old = S3fsCurl::is_public_bucket;
@ -2132,8 +2137,6 @@ bool S3fsCurl::ClearInternalData()
//
type = REQTYPE::UNSET;
path = "";
base_path = "";
saved_path = "";
url = "";
op = "";
query_string= "";
@ -2144,7 +2147,6 @@ bool S3fsCurl::ClearInternalData()
responseHeaders.clear();
bodydata.clear();
headdata.clear();
LastResponseCode = S3FSCURL_RESPONSECODE_NOTSET;
postdata = nullptr;
postdata_remaining = 0;
retry_count = 0;
@ -3306,14 +3308,12 @@ bool S3fsCurl::AddSseRequestHead(sse_type_t ssetype, const std::string& input, b
//
// tpath : target path for head request
// bpath : saved into base_path
// savedpath : saved into saved_path
// ssekey_pos : -1 means "not" SSE-C type
// 0 - X means SSE-C type and position for SSE-C key(0 is latest key)
//
bool S3fsCurl::PreHeadRequest(const char* tpath, const char* bpath, const char* savedpath, size_t ssekey_pos)
bool S3fsCurl::PreHeadRequest(const char* tpath, size_t ssekey_pos)
{
S3FS_PRN_INFO3("[tpath=%s][bpath=%s][save=%s][sseckeypos=%zu]", SAFESTRPTR(tpath), SAFESTRPTR(bpath), SAFESTRPTR(savedpath), ssekey_pos);
S3FS_PRN_INFO3("[tpath=%s][sseckeypos=%zu]", SAFESTRPTR(tpath), ssekey_pos);
if(!tpath){
return false;
@ -3325,8 +3325,6 @@ bool S3fsCurl::PreHeadRequest(const char* tpath, const char* bpath, const char*
// libcurl 7.17 does deep copy of url, deep copy "stable" url
url = prepare_url(turl.c_str());
path = get_realpath(tpath);
base_path = SAFESTRPTR(bpath);
saved_path = SAFESTRPTR(savedpath);
requestHeaders = nullptr;
responseHeaders.clear();
@ -3362,7 +3360,7 @@ int S3fsCurl::HeadRequest(const char* tpath, headers_t& meta)
if(!DestroyCurlHandle()){
break;
}
if(!PreHeadRequest(tpath, nullptr, nullptr, pos)){
if(!PreHeadRequest(tpath, pos)){
break;
}
if(!fpLazySetup || !fpLazySetup(this)){

View File

@ -97,6 +97,8 @@ typedef std::vector<sseckeymap_t> sseckeylist_t;
//
class S3fsCurl
{
// [TODO]
// If S3fsMultiCurl is discontinued, the following friends will be deleted.
friend class S3fsMultiCurl;
private:
@ -175,8 +177,6 @@ class S3fsCurl
CurlUniquePtr hCurl PT_GUARDED_BY(curl_handles_lock) = {nullptr, curl_easy_cleanup};
REQTYPE type; // type of request
std::string path; // target object path
std::string base_path; // base path (for multi curl head request)
std::string saved_path; // saved path = cache key (for multi curl head request)
std::string url; // target object path(url)
struct curl_slist* requestHeaders;
headers_t responseHeaders; // header data by HeaderCallback
@ -187,7 +187,7 @@ class S3fsCurl
off_t postdata_remaining; // use by post method and read callback function.
filepart partdata; // use by multipart upload/get object callback
bool is_use_ahbe; // additional header by extension
int retry_count; // retry count for multipart
int retry_count; // retry count for multipart ([TODO] If S3fsMultiCurl is discontinued, this variable will be deleted.)
std::unique_ptr<FILE, decltype(&s3fs_fclose)> b_infile = {nullptr, &s3fs_fclose}; // backup for retrying
const unsigned char* b_postdata; // backup for retrying
off_t b_postdata_remaining; // backup for retrying
@ -273,6 +273,10 @@ class S3fsCurl
void insertIBMIAMHeaders(const std::string& access_key_id, const std::string& access_token);
bool insertAuthHeaders();
bool AddSseRequestHead(sse_type_t ssetype, const std::string& ssevalue, bool is_copy);
bool PreHeadRequest(const char* tpath, size_t ssekey_pos = -1);
bool PreHeadRequest(const std::string& tpath, size_t ssekey_pos = -1) {
return PreHeadRequest(tpath.c_str(), ssekey_pos);
}
std::string CalcSignatureV2(const std::string& method, const std::string& strMD5, const std::string& content_type, const std::string& date, const std::string& resource, const std::string& secret_access_key, const std::string& access_token);
std::string CalcSignature(const std::string& method, const std::string& canonical_uri, const std::string& query_string, const std::string& strdate, const std::string& payload_hash, const std::string& date8601, const std::string& secret_access_key, const std::string& access_token);
int MultipartUploadPartSetup(const char* tpath, int part_num, const std::string& upload_id);
@ -301,6 +305,7 @@ class S3fsCurl
static time_t SetReadwriteTimeout(time_t timeout);
static time_t GetReadwriteTimeout() { return S3fsCurl::readwrite_timeout; }
static int SetRetries(int count);
static int GetRetries();
static bool SetPublicBucket(bool flag);
static bool IsPublicBucket() { return S3fsCurl::is_public_bucket; }
static acl_t SetDefaultAcl(acl_t acl);
@ -367,10 +372,6 @@ class S3fsCurl
int RequestPerform(bool dontAddAuthHeaders=false);
int DeleteRequest(const char* tpath);
int GetIAMv2ApiToken(const char* token_url, int token_ttl, const char* token_ttl_hdr, std::string& response);
bool PreHeadRequest(const char* tpath, const char* bpath = nullptr, const char* savedpath = nullptr, size_t ssekey_pos = -1);
bool PreHeadRequest(const std::string& tpath, const std::string& bpath, const std::string& savedpath, size_t ssekey_pos = -1) {
return PreHeadRequest(tpath.c_str(), bpath.c_str(), savedpath.c_str(), ssekey_pos);
}
int HeadRequest(const char* tpath, headers_t& meta);
int PutHeadRequest(const char* tpath, const headers_t& meta, bool is_copy);
int PutRequest(const char* tpath, headers_t& meta, int fd);
@ -390,8 +391,6 @@ class S3fsCurl
// methods(variables)
const std::string& GetPath() const { return path; }
const std::string& GetBasePath() const { return base_path; }
const std::string& GetSpecialSavedPath() const { return saved_path; }
const std::string& GetUrl() const { return url; }
const std::string& GetOp() const { return op; }
const headers_t* GetResponseHeaders() const { return &responseHeaders; }
@ -403,10 +402,6 @@ class S3fsCurl
bool EnableUseAhbe() { return SetUseAhbe(true); }
bool DisableUseAhbe() { return SetUseAhbe(false); }
bool IsUseAhbe() const { return is_use_ahbe; }
int GetMultipartRetryCount() const { return retry_count; }
void SetMultipartRetryCount(int retrycnt) { retry_count = retrycnt; }
bool IsOverMultipartRetryCount() const { return (retry_count >= S3fsCurl::retries); }
size_t GetLastPreHeadSeecKeyPos() const { return b_ssekey_pos; }
};
#endif // S3FS_CURL_H_

View File

@ -31,6 +31,7 @@
#include <utility>
#include "common.h"
#include "s3fs.h"
#include "fdcache_entity.h"
#include "fdcache_fdinfo.h"
#include "fdcache_stat.h"

View File

@ -29,6 +29,7 @@
#include <unistd.h>
#include "common.h"
#include "s3fs.h"
#include "s3fs_logger.h"
#include "s3fs_util.h"
#include "fdcache_fdinfo.h"

View File

@ -22,6 +22,7 @@
#include <cstdlib>
#include <string>
#include "s3fs.h"
#include "s3fs_logger.h"
#include "mpu_util.h"
#include "curl.h"

View File

@ -126,8 +126,6 @@ static int check_object_access(const char* path, int mask, struct stat* pstbuf);
static int check_object_owner(const char* path, struct stat* pstbuf);
static int check_parent_object_access(const char* path, int mask);
static int get_local_fent(AutoFdEntity& autoent, FdEntity **entity, const char* path, int flags = O_RDONLY, bool is_load = false);
static bool multi_head_callback(S3fsCurl* s3fscurl, void* param);
static std::unique_ptr<S3fsCurl> multi_head_retry_callback(S3fsCurl* s3fscurl);
static int readdir_multi_head(const char* path, const S3ObjList& head, void* buf, fuse_fill_dir_t filler);
static int list_bucket(const char* path, S3ObjList& head, const char* delimiter, bool check_content_only = false);
static int directory_empty(const char* path);
@ -208,66 +206,6 @@ static int s3fs_removexattr(const char* path, const char* name);
// The flag is accessed from child threads, so std::atomic is used for exclusive control of flags.
static std::atomic<bool> has_mp_stat;
//
// A synchronous class that calls the fuse_fill_dir_t function that processes the readdir data
//
class SyncFiller
{
private:
mutable std::mutex filler_lock;
void* filler_buff;
fuse_fill_dir_t filler_func;
std::set<std::string> filled;
public:
explicit SyncFiller(void* buff = nullptr, fuse_fill_dir_t filler = nullptr);
~SyncFiller() = default;
SyncFiller(const SyncFiller&) = delete;
SyncFiller(SyncFiller&&) = delete;
SyncFiller& operator=(const SyncFiller&) = delete;
SyncFiller& operator=(SyncFiller&&) = delete;
int Fill(const char *name, const struct stat *stbuf, off_t off);
int SufficiencyFill(const std::vector<std::string>& pathlist);
};
SyncFiller::SyncFiller(void* buff, fuse_fill_dir_t filler) : filler_buff(buff), filler_func(filler)
{
if(!filler_buff || !filler_func){
S3FS_PRN_CRIT("Internal error: SyncFiller constructor parameter is critical value.");
abort();
}
}
//
// See. prototype fuse_fill_dir_t in fuse.h
//
int SyncFiller::Fill(const char *name, const struct stat *stbuf, off_t off)
{
const std::lock_guard<std::mutex> lock(filler_lock);
int result = 0;
if(filled.insert(name).second){
result = filler_func(filler_buff, name, stbuf, off);
}
return result;
}
int SyncFiller::SufficiencyFill(const std::vector<std::string>& pathlist)
{
const std::lock_guard<std::mutex> lock(filler_lock);
int result = 0;
for(auto it = pathlist.cbegin(); it != pathlist.cend(); ++it) {
if(filled.insert(*it).second){
if(0 != filler_func(filler_buff, it->c_str(), nullptr, 0)){
result = 1;
}
}
}
return result;
}
//-------------------------------------------------------------------
// Functions
//-------------------------------------------------------------------
@ -3148,141 +3086,25 @@ static int s3fs_opendir(const char* _path, struct fuse_file_info* fi)
return result;
}
// [TODO]
// This function's argument(s3fscurl) will be checked and changed after removing S3fsMultiCurl
//
// cppcheck-suppress unmatchedSuppression
// cppcheck-suppress constParameterCallback
static bool multi_head_callback(S3fsCurl* s3fscurl, void* param)
{
if(!s3fscurl){
return false;
}
// Add stat cache
const std::string& saved_path = s3fscurl->GetSpecialSavedPath();
if(!StatCache::getStatCacheData()->AddStat(saved_path, *(s3fscurl->GetResponseHeaders()))){
S3FS_PRN_ERR("failed adding stat cache [path=%s]", saved_path.c_str());
return false;
}
// Get stats from stats cache(for converting from meta), and fill
std::string bpath = mybasename(saved_path);
if(use_wtf8){
bpath = s3fs_wtf8_decode(bpath);
}
if(param){
auto* pcbparam = reinterpret_cast<SyncFiller*>(param);
struct stat st;
if(StatCache::getStatCacheData()->GetStat(saved_path, &st)){
pcbparam->Fill(bpath.c_str(), &st, 0);
}else{
S3FS_PRN_INFO2("Could not find %s file in stat cache.", saved_path.c_str());
pcbparam->Fill(bpath.c_str(), nullptr, 0);
}
}else{
S3FS_PRN_WARN("param(multi_head_callback_param*) is nullptr, then can not call filler.");
}
return true;
}
struct multi_head_notfound_callback_param
{
std::mutex list_lock;
s3obj_list_t notfound_list;
};
// [TODO]
// This function's argument(s3fscurl) will be checked and changed after removing S3fsMultiCurl
//
static bool multi_head_notfound_callback(S3fsCurl* s3fscurl, void* param)
{
if(!s3fscurl){
return false;
}
S3FS_PRN_INFO("HEAD returned NotFound(404) for %s object, it maybe only the path exists and the object does not exist.", s3fscurl->GetPath().c_str());
if(!param){
S3FS_PRN_WARN("param(multi_head_notfound_callback_param*) is nullptr, then can not call filler.");
return false;
}
// set path to not found list
auto* pcbparam = reinterpret_cast<struct multi_head_notfound_callback_param*>(param);
const std::lock_guard<std::mutex> lock(pcbparam->list_lock);
pcbparam->notfound_list.push_back(s3fscurl->GetBasePath());
return true;
}
// [TODO]
// This function's argument(s3fscurl) will be checked and changed after removing S3fsMultiCurl
//
static std::unique_ptr<S3fsCurl> multi_head_retry_callback(S3fsCurl* s3fscurl)
{
if(!s3fscurl){
return nullptr;
}
size_t ssec_key_pos= s3fscurl->GetLastPreHeadSeecKeyPos();
int retry_count = s3fscurl->GetMultipartRetryCount();
// retry next sse key.
// if end of sse key, set retry master count is up.
ssec_key_pos = (ssec_key_pos == static_cast<size_t>(-1) ? 0 : ssec_key_pos + 1);
if(0 == S3fsCurl::GetSseKeyCount() || S3fsCurl::GetSseKeyCount() <= ssec_key_pos){
if(s3fscurl->IsOverMultipartRetryCount()){
S3FS_PRN_ERR("Over retry count(%d) limit(%s).", s3fscurl->GetMultipartRetryCount(), s3fscurl->GetSpecialSavedPath().c_str());
return nullptr;
}
ssec_key_pos = -1;
retry_count++;
}
std::unique_ptr<S3fsCurl> newcurl(new S3fsCurl(s3fscurl->IsUseAhbe()));
const std::string& path = s3fscurl->GetBasePath();
const std::string& base_path = s3fscurl->GetBasePath();
const std::string& saved_path = s3fscurl->GetSpecialSavedPath();
if(!newcurl->PreHeadRequest(path, base_path, saved_path, ssec_key_pos)){
S3FS_PRN_ERR("Could not duplicate curl object(%s).", saved_path.c_str());
return nullptr;
}
newcurl->SetMultipartRetryCount(retry_count);
return newcurl;
}
static int readdir_multi_head(const char* path, const S3ObjList& head, void* buf, fuse_fill_dir_t filler)
{
// [TODO]
// This will be checked and changed after removing S3fsMultiCurl
//
S3fsMultiCurl curlmulti(S3fsCurl::GetMaxMultiRequest(), true); // [NOTE] run all requests to completion even if some requests fail.
s3obj_list_t headlist;
int result = 0;
S3FS_PRN_INFO1("[path=%s][list=%zu]", path, headlist.size());
S3FS_PRN_INFO1("[path=%s][head=<%s>][filler=%p]", path, head.IsEmpty() ? "empty" : "not empty", filler);
// Make base path list.
s3obj_list_t headlist;
head.GetNameList(headlist, true, false); // get name with "/".
StatCache::getStatCacheData()->GetNotruncateCache(std::string(path), headlist); // Add notruncate file name from stat cache
// Initialize S3fsMultiCurl
curlmulti.SetSuccessCallback(multi_head_callback);
curlmulti.SetRetryCallback(multi_head_retry_callback);
// Success Callback function parameter(SyncFiller object)
// Initialize SyncFiller object
SyncFiller syncfiller(buf, filler);
curlmulti.SetSuccessCallbackParam(reinterpret_cast<void*>(&syncfiller));
// Not found Callback function parameter
struct multi_head_notfound_callback_param notfound_param;
if(support_compat_dir){
curlmulti.SetNotFoundCallback(multi_head_notfound_callback);
curlmulti.SetNotFoundCallbackParam(reinterpret_cast<void*>(&notfound_param));
}
// common variables
Semaphore multi_head_sem(0);
int req_count = 0;
int req_result = 0;
int retrycount = 0;
std::mutex thparam_lock;
s3obj_list_t notfound_list;
// Make single head request(with max).
for(auto iter = headlist.cbegin(); headlist.cend() != iter; ++iter){
@ -3298,37 +3120,44 @@ static int readdir_multi_head(const char* path, const S3ObjList& head, void* buf
if(use_wtf8){
bpath = s3fs_wtf8_decode(bpath);
}
syncfiller.Fill(bpath.c_str(), &st, 0);
syncfiller.Fill(bpath, &st, 0);
continue;
}
// First check for directory, start checking "not SSE-C".
// If checking failed, retry to check with "SSE-C" by retry callback func when SSE-C mode.
std::unique_ptr<S3fsCurl> s3fscurl(new S3fsCurl());
if(!s3fscurl->PreHeadRequest(disppath, disppath, disppath)){ // target path = cache key path.(ex "dir/")
S3FS_PRN_WARN("Could not make curl object for head request(%s).", disppath.c_str());
continue;
}
// parameter for thread worker
auto* thargs = new multi_head_req_thparam; // free in multi_head_req_threadworker
thargs->psyncfiller = &syncfiller;
thargs->pthparam_lock = &thparam_lock; // for pretrycount and presult member
thargs->pretrycount = &retrycount;
thargs->pnotfound_list = &notfound_list;
thargs->use_wtf8 = use_wtf8;
thargs->path = disppath;
thargs->presult = &req_result;
if(!curlmulti.SetS3fsCurlObject(std::move(s3fscurl))){
S3FS_PRN_WARN("Could not make curl object into multi curl(%s).", disppath.c_str());
continue;
// make parameter for thread pool
thpoolman_param ppoolparam;
ppoolparam.args = thargs;
ppoolparam.psem = &multi_head_sem;
ppoolparam.pfunc = multi_head_req_threadworker;
// setup instruction
if(!ThreadPoolMan::Instruct(ppoolparam)){
S3FS_PRN_ERR("failed setup instruction for one header request.");
delete thargs;
return -EIO;
}
++req_count;
}
headlist.clear();
// Multi request
if(0 != (result = curlmulti.Request())){
// If result is -EIO, it is something error occurred.
// This case includes that the object is encrypting(SSE) and s3fs does not have keys.
// So s3fs set result to 0 in order to continue the process.
if(-EIO == result){
S3FS_PRN_WARN("error occurred in multi request(errno=%d), but continue...", result);
result = 0;
}else{
S3FS_PRN_ERR("error occurred in multi request(errno=%d).", result);
return result;
}
// wait for finish all requests
while(req_count > 0){
multi_head_sem.acquire();
--req_count;
}
// print messages
if(0 != req_result){
S3FS_PRN_DBG("Some head requests returned error, first error is %d.", req_result);
}
// [NOTE]
@ -3338,7 +3167,7 @@ static int readdir_multi_head(const char* path, const S3ObjList& head, void* buf
if(!support_compat_dir){
syncfiller.SufficiencyFill(head.GetCommonPrefixes());
}
if(support_compat_dir && !notfound_param.notfound_list.empty()){ // [NOTE] not need to lock to access this here.
if(support_compat_dir && !notfound_list.empty()){ // [NOTE] not need to lock to access this here.
// dummy header
mode_t dirmask = umask(0); // macos does not have getumask()
umask(dirmask);
@ -3352,7 +3181,7 @@ static int readdir_multi_head(const char* path, const S3ObjList& head, void* buf
dummy_header["x-amz-meta-ctime"] = "0";
dummy_header["x-amz-meta-mtime"] = "0";
for(auto reiter = notfound_param.notfound_list.cbegin(); reiter != notfound_param.notfound_list.cend(); ++reiter){
for(auto reiter = notfound_list.cbegin(); reiter != notfound_list.cend(); ++reiter){
int dir_result;
const std::string& dirpath = *reiter;
if(-ENOTEMPTY == (dir_result = directory_empty(dirpath.c_str()))){
@ -3361,17 +3190,17 @@ static int readdir_multi_head(const char* path, const S3ObjList& head, void* buf
// Add stat cache
if(StatCache::getStatCacheData()->AddStat(dirpath, dummy_header, true)){ // set forcedir=true
// Get stats from stats cache(for converting from meta), and fill
std::string base_path = mybasename(dirpath);
std::string bpath = mybasename(dirpath);
if(use_wtf8){
base_path = s3fs_wtf8_decode(base_path);
bpath = s3fs_wtf8_decode(bpath);
}
struct stat st;
if(StatCache::getStatCacheData()->GetStat(dirpath, &st)){
syncfiller.Fill(base_path.c_str(), &st, 0);
syncfiller.Fill(bpath, &st, 0);
}else{
S3FS_PRN_INFO2("Could not find %s directory(no dir object) in stat cache.", dirpath.c_str());
syncfiller.Fill(base_path.c_str(), nullptr, 0);
syncfiller.Fill(bpath, nullptr, 0);
}
}else{
S3FS_PRN_ERR("failed adding stat cache [path=%s], but dontinue...", dirpath.c_str());
@ -3381,8 +3210,7 @@ static int readdir_multi_head(const char* path, const S3ObjList& head, void* buf
}
}
}
return result;
return 0;
}
static int s3fs_readdir(const char* _path, void* buf, fuse_fill_dir_t filler, off_t offset, struct fuse_file_info* fi)

View File

@ -19,10 +19,14 @@
*/
#include "common.h"
#include "s3fs.h"
#include "s3fs_threadreqs.h"
#include "threadpoolman.h"
#include "curl_util.h"
#include "s3fs_logger.h"
#include "s3fs_util.h"
#include "cache.h"
#include "string_util.h"
//-------------------------------------------------------------------
// Thread Worker functions for MultiThread Request
@ -44,6 +48,150 @@ void* head_req_threadworker(void* arg)
return reinterpret_cast<void*>(pthparam->result);
}
//
// Thread Worker function for multi head request
//
void* multi_head_req_threadworker(void* arg)
{
std::unique_ptr<multi_head_req_thparam> pthparam(static_cast<multi_head_req_thparam*>(arg));
if(!pthparam || !pthparam->psyncfiller || !pthparam->pthparam_lock || !pthparam->pretrycount || !pthparam->pnotfound_list || !pthparam->presult){
return reinterpret_cast<void*>(-EIO);
}
// Check retry max count and print debug message
{
const std::lock_guard<std::mutex> lock(*(pthparam->pthparam_lock));
S3FS_PRN_INFO3("Multi Head Request [filler=%p][thparam_lock=%p][retrycount=%d][notfound_list=%p][wtf8=%s][path=%s]", pthparam->psyncfiller, pthparam->pthparam_lock, *(pthparam->pretrycount), pthparam->pnotfound_list, pthparam->use_wtf8 ? "true" : "false", pthparam->path.c_str());
if(S3fsCurl::GetRetries() < *(pthparam->pretrycount)){
S3FS_PRN_ERR("Head request(%s) reached the maximum number of retry count(%d).", pthparam->path.c_str(), *(pthparam->pretrycount));
return reinterpret_cast<void*>(-EIO);
}
}
// loop for head request
S3fsCurl s3fscurl;
int result = 0;
headers_t meta; // this value is not used
while(true){
// Request
result = s3fscurl.HeadRequest(pthparam->path.c_str(), meta);
// Check result
bool isResetOffset= true;
CURLcode curlCode = s3fscurl.GetCurlCode();
long responseCode = S3fsCurl::S3FSCURL_RESPONSECODE_NOTSET;
s3fscurl.GetResponseCode(responseCode, false);
if(CURLE_OK == curlCode){
if(responseCode < 400){
// add into stat cache
if(StatCache::getStatCacheData()->AddStat(pthparam->path, *(s3fscurl.GetResponseHeaders()))){
// Get stats from stats cache(for converting from meta), and fill
std::string bpath = mybasename(pthparam->path);
if(pthparam->use_wtf8){
bpath = s3fs_wtf8_decode(bpath);
}
struct stat st;
if(StatCache::getStatCacheData()->GetStat(pthparam->path, &st)){
pthparam->psyncfiller->Fill(bpath, &st, 0);
}else{
S3FS_PRN_INFO2("Could not find %s file in stat cache.", pthparam->path.c_str());
pthparam->psyncfiller->Fill(bpath, nullptr, 0);
}
result = 0;
}else{
S3FS_PRN_ERR("failed adding stat cache [path=%s]", pthparam->path.c_str());
if(0 == result){
result = -EIO;
}
}
break;
}else if(responseCode == 400){
// as possibly in multipart
S3FS_PRN_WARN("Head Request(%s) got 400 response code.", pthparam->path.c_str());
}else if(responseCode == 404){
// set path to not found list
S3FS_PRN_INFO("Head Request(%s) got NotFound(404), it maybe only the path exists and the object does not exist.", pthparam->path.c_str());
{
const std::lock_guard<std::mutex> lock(*(pthparam->pthparam_lock));
pthparam->pnotfound_list->push_back(pthparam->path);
}
break;
}else if(responseCode == 500){
// case of all other result, do retry.(11/13/2013)
// because it was found that s3fs got 500 error from S3, but could success
// to retry it.
S3FS_PRN_WARN("Head Request(%s) got 500 response code.", pthparam->path.c_str());
// cppcheck-suppress unmatchedSuppression
// cppcheck-suppress knownConditionTrueFalse
}else if(responseCode == S3fsCurl::S3FSCURL_RESPONSECODE_NOTSET){
// This is a case where the processing result has not yet been updated (should be very rare).
S3FS_PRN_WARN("Head Request(%s) could not get any response code.", pthparam->path.c_str());
}else{ // including S3fsCurl::S3FSCURL_RESPONSECODE_FATAL_ERROR
// Retry in other case.
S3FS_PRN_WARN("Head Request(%s) got fatal response code.", pthparam->path.c_str());
}
}else if(CURLE_OPERATION_TIMEDOUT == curlCode){
S3FS_PRN_ERR("Head Request(%s) is timeouted.", pthparam->path.c_str());
isResetOffset= false;
}else if(CURLE_PARTIAL_FILE == curlCode){
S3FS_PRN_WARN("Head Request(%s) is recieved data does not match the given size.", pthparam->path.c_str());
isResetOffset= false;
}else{
S3FS_PRN_WARN("Head Request(%s) got the result code(%d: %s)", pthparam->path.c_str(), curlCode, curl_easy_strerror(curlCode));
}
// Check retry max count
{
const std::lock_guard<std::mutex> lock(*(pthparam->pthparam_lock));
++(*(pthparam->pretrycount));
if(S3fsCurl::GetRetries() < *(pthparam->pretrycount)){
S3FS_PRN_ERR("Head request(%s) reached the maximum number of retry count(%d).", pthparam->path.c_str(), *(pthparam->pretrycount));
if(0 == result){
result = -EIO;
}
break;
}
}
// Setup for retry
if(isResetOffset){
S3fsCurl::ResetOffset(&s3fscurl);
}
}
// Set result code
{
const std::lock_guard<std::mutex> lock(*(pthparam->pthparam_lock));
if(0 == *(pthparam->presult) && 0 != result){
// keep first error
*(pthparam->presult) = result;
}
}
// [NOTE]
// The return value of a Multi Head request thread will always be 0(nullptr).
// This is because the expected value of a Head request will always be a
// response other than 200, such as 400/404/etc.
// In those error cases, this function simply outputs a message. And those
// errors(the first one) will be set to pthparam->presult and can be referenced
// by the caller.
//
return nullptr;
}
//
// Thread Worker function for delete request
//

View File

@ -26,6 +26,8 @@
#include "common.h"
#include "metaheader.h"
#include "curl.h"
#include "s3objlist.h"
#include "syncfiller.h"
//-------------------------------------------------------------------
// Structures for MultiThread Request
@ -40,6 +42,20 @@ struct head_req_thparam
int result = 0;
};
//
// Multi Head Request parameter structure for Thread Pool.
//
struct multi_head_req_thparam
{
std::string path;
SyncFiller* psyncfiller = nullptr;
std::mutex* pthparam_lock = nullptr;
int* pretrycount = nullptr;
s3obj_list_t* pnotfound_list = nullptr;
bool use_wtf8 = false;
int* presult = nullptr;
};
//
// Delete Request parameter structure for Thread Pool.
//
@ -144,6 +160,7 @@ struct get_object_req_thparam
// Thread Worker functions for MultiThread Request
//-------------------------------------------------------------------
void* head_req_threadworker(void* arg);
void* multi_head_req_threadworker(void* arg);
void* delete_req_threadworker(void* arg);
void* put_head_req_threadworker(void* arg);
void* put_req_threadworker(void* arg);

74
src/syncfiller.cpp Normal file
View File

@ -0,0 +1,74 @@
/*
* s3fs - FUSE-based file system backed by Amazon S3
*
* Copyright(C) 2007 Randy Rizun <rrizun@gmail.com>
*
* This program is free software; you can redistribute it and/or
* modify it under the terms of the GNU General Public License
* as published by the Free Software Foundation; either version 2
* of the License, or (at your option) any later version.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU General Public License for more details.
*
* You should have received a copy of the GNU General Public License
* along with this program; if not, write to the Free Software
* Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA.
*/
#include <cstdio>
#include <cstdlib>
#include "s3fs_logger.h"
#include "syncfiller.h"
//-------------------------------------------------------------------
// Class SyncFiller
//-------------------------------------------------------------------
SyncFiller::SyncFiller(void* buff, fuse_fill_dir_t filler) : filler_buff(buff), filler_func(filler)
{
if(!filler_buff || !filler_func){
S3FS_PRN_CRIT("Internal error: SyncFiller constructor parameter is critical value.");
abort();
}
}
//
// See. prototype fuse_fill_dir_t in fuse.h
//
int SyncFiller::Fill(const std::string& name, const struct stat *stbuf, off_t off)
{
const std::lock_guard<std::mutex> lock(filler_lock);
int result = 0;
if(filled.insert(name).second){
result = filler_func(filler_buff, name.c_str(), stbuf, off);
}
return result;
}
int SyncFiller::SufficiencyFill(const std::vector<std::string>& pathlist)
{
const std::lock_guard<std::mutex> lock(filler_lock);
int result = 0;
for(auto it = pathlist.cbegin(); it != pathlist.cend(); ++it) {
if(filled.insert(*it).second){
if(0 != filler_func(filler_buff, it->c_str(), nullptr, 0)){
result = 1;
}
}
}
return result;
}
/*
* Local variables:
* tab-width: 4
* c-basic-offset: 4
* End:
* vim600: expandtab sw=4 ts=4 fdm=marker
* vim<600: expandtab sw=4 ts=4
*/

67
src/syncfiller.h Normal file
View File

@ -0,0 +1,67 @@
/*
* s3fs - FUSE-based file system backed by Amazon S3
*
* Copyright(C) 2007 Randy Rizun <rrizun@gmail.com>
*
* This program is free software; you can redistribute it and/or
* modify it under the terms of the GNU General Public License
* as published by the Free Software Foundation; either version 2
* of the License, or (at your option) any later version.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU General Public License for more details.
*
* You should have received a copy of the GNU General Public License
* along with this program; if not, write to the Free Software
* Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA.
*/
#ifndef SYNCFILLER_H_
#define SYNCFILLER_H_
#include <string>
#include <mutex>
#include <vector>
#include <set>
#include "s3fs.h"
//----------------------------------------------
// class SyncFiller
//----------------------------------------------
//
// A synchronous class that calls the fuse_fill_dir_t
// function that processes the readdir data
//
class SyncFiller
{
private:
mutable std::mutex filler_lock;
void* filler_buff;
fuse_fill_dir_t filler_func;
std::set<std::string> filled;
public:
explicit SyncFiller(void* buff = nullptr, fuse_fill_dir_t filler = nullptr);
~SyncFiller() = default;
SyncFiller(const SyncFiller&) = delete;
SyncFiller(SyncFiller&&) = delete;
SyncFiller& operator=(const SyncFiller&) = delete;
SyncFiller& operator=(SyncFiller&&) = delete;
int Fill(const std::string& name, const struct stat *stbuf, off_t off);
int SufficiencyFill(const std::vector<std::string>& pathlist);
};
#endif // SYNCFILLER_H_
/*
* Local variables:
* tab-width: 4
* c-basic-offset: 4
* End:
* vim600: expandtab sw=4 ts=4 fdm=marker
* vim<600: expandtab sw=4 ts=4
*/