From 61abf80197e3d92c45d406d09aa0b0b0ddd93615 Mon Sep 17 00:00:00 2001 From: Takeshi Nakatani Date: Sun, 10 Nov 2024 09:29:06 +0000 Subject: [PATCH] Organized multi-threading related options --- doc/man/s3fs.1.in | 18 ++++-------------- src/curl.cpp | 16 ---------------- src/curl.h | 7 ------- src/fdcache_entity.cpp | 2 +- src/s3fs.cpp | 29 ++++++++++++----------------- src/s3fs_help.cpp | 22 ++++++---------------- src/threadpoolman.cpp | 26 +++++++++++++++++++++++++- src/threadpoolman.h | 5 ++++- 8 files changed, 52 insertions(+), 73 deletions(-) diff --git a/doc/man/s3fs.1.in b/doc/man/s3fs.1.in index 7258452..8731e26 100644 --- a/doc/man/s3fs.1.in +++ b/doc/man/s3fs.1.in @@ -197,15 +197,6 @@ s3fs is always using DNS cache, this option make DNS cache disable. \fB\-o\fR nosscache - disable SSL session cache. s3fs is always using SSL session cache, this option make SSL session cache disable. .TP -\fB\-o\fR multireq_max (default="20") -maximum number of parallel request for listing objects. -.TP -\fB\-o\fR parallel_count (default="5") -number of parallel request for uploading big objects. -s3fs uploads large object (over 25MB by default) by multipart post request, and sends parallel requests. -This option limits parallel request count which s3fs requests at once. -It is necessary to set this value depending on a CPU and a network band. -.TP \fB\-o\fR multipart_size (default="10") part size, in MB, for each multipart request. The minimum value is 5 MB and the maximum value is 5 GB. @@ -296,10 +287,9 @@ If this option is enabled, a sequential upload will be performed in parallel wit This is expected to give better performance than other upload functions. Note that this option is still experimental and may change in the future. .TP -\fB\-o\fR max_thread_count (default is "5") -Specifies the number of threads waiting for stream uploads. -Note that this option and Stream Upload are still experimental and subject to change in the future. -This option will be merged with "parallel_count" in the future. +\fB\-o\fR max_thread_count (default is "10") +This value is the maximum number of parallel requests to be sent, and the number of parallel processes for head requests, multipart uploads and stream uploads. +Worker threads will be started to process requests according to this value. .TP \fB\-o\fR enable_content_md5 (default is disable) Allow S3 server to check data integrity of uploads via the Content-MD5 header. @@ -522,7 +512,7 @@ s3fs is a multi-threaded application. Depending on the workload it may use multi .TP .SS Performance of S3 requests .TP -s3fs provides several options (e.g. "\-o multipart_size", "\-o parallel_count") to control behaviour and thus indirectly the performance. The possible combinations of these options in conjunction with the various S3 backends are so varied that there is no individual recommendation other than the default values. Improved individual settings can be found by testing and measuring. +s3fs provides several options (e.g. "max_thread_count" option) to control behaviour and thus indirectly the performance. The possible combinations of these options in conjunction with the various S3 backends are so varied that there is no individual recommendation other than the default values. Improved individual settings can be found by testing and measuring. .TP The two options "Enable no object cache" ("\-o enable_noobj_cache") and "Disable support of alternative directory names" ("\-o notsup_compat_dir") can be used to control shared access to the same bucket by different applications: .TP diff --git a/src/curl.cpp b/src/curl.cpp index 9a5e120..6ff8fe3 100644 --- a/src/curl.cpp +++ b/src/curl.cpp @@ -129,8 +129,6 @@ std::map S3fsCurl::curl_progress; std::string S3fsCurl::curl_ca_bundle; mimes_t S3fsCurl::mimeTypes; std::string S3fsCurl::userAgent; -int S3fsCurl::max_parallel_cnt = 5; // default -int S3fsCurl::max_multireq = 20; // default off_t S3fsCurl::multipart_size = MULTIPART_SIZE; // default off_t S3fsCurl::multipart_copy_size = 512 * 1024 * 1024; // default signature_type_t S3fsCurl::signature_type = signature_type_t::V2_OR_V4; // default @@ -946,20 +944,6 @@ bool S3fsCurl::SetMultipartCopySize(off_t size) return true; } -int S3fsCurl::SetMaxParallelCount(int value) -{ - int old = S3fsCurl::max_parallel_cnt; - S3fsCurl::max_parallel_cnt = value; - return old; -} - -int S3fsCurl::SetMaxMultiRequest(int max) -{ - int old = S3fsCurl::max_multireq; - S3fsCurl::max_multireq = max; - return old; -} - // [NOTE] // This proxy setting is as same as the "--proxy" option of the curl command, // and equivalent to the "CURLOPT_PROXY" option of the curl_easy_setopt() diff --git a/src/curl.h b/src/curl.h index 2294ca9..de09db9 100644 --- a/src/curl.h +++ b/src/curl.h @@ -151,7 +151,6 @@ class S3fsCurl static std::string curl_ca_bundle; static mimes_t mimeTypes; static std::string userAgent; - static int max_parallel_cnt; static int max_multireq; static off_t multipart_size; static off_t multipart_copy_size; @@ -302,12 +301,6 @@ class S3fsCurl static long GetSslVerifyHostname() { return S3fsCurl::ssl_verify_hostname; } static bool SetSSLClientCertOptions(const std::string& values); static void ResetOffset(S3fsCurl* pCurl); - // maximum parallel GET and PUT requests - static int SetMaxParallelCount(int value); - static int GetMaxParallelCount() { return S3fsCurl::max_parallel_cnt; } - // maximum parallel HEAD requests - static int SetMaxMultiRequest(int max); - static int GetMaxMultiRequest() { return S3fsCurl::max_multireq; } static bool SetMultipartSize(off_t size); static off_t GetMultipartSize() { return S3fsCurl::multipart_size; } static bool SetMultipartCopySize(off_t size); diff --git a/src/fdcache_entity.cpp b/src/fdcache_entity.cpp index abbeeeb..811396a 100644 --- a/src/fdcache_entity.cpp +++ b/src/fdcache_entity.cpp @@ -2036,7 +2036,7 @@ ssize_t FdEntity::Read(int fd, char* bytes, off_t start, size_t size, bool force // load size(for prefetch) size_t load_size = size; if(start + static_cast(size) < pagelist.Size()){ - ssize_t prefetch_max_size = std::max(static_cast(size), S3fsCurl::GetMultipartSize() * S3fsCurl::GetMaxParallelCount()); + ssize_t prefetch_max_size = std::max(static_cast(size), S3fsCurl::GetMultipartSize() * ThreadPoolMan::GetWorkerCount()); if(start + prefetch_max_size < pagelist.Size()){ load_size = prefetch_max_size; diff --git a/src/s3fs.cpp b/src/s3fs.cpp index c53440f..b1f2e3f 100644 --- a/src/s3fs.cpp +++ b/src/s3fs.cpp @@ -106,7 +106,6 @@ static int max_keys_list_object = 1000;// default is 1000 static off_t max_dirty_data = 5LL * 1024LL * 1024LL * 1024LL; static bool use_wtf8 = false; static off_t fake_diskfree_size = -1; // default is not set(-1) -static int max_thread_count = 5; // default is 5 static bool update_parent_dir_stat= false; // default not updating parent directory stats static fsblkcnt_t bucket_block_count; // advertised block count of the bucket static unsigned long s3fs_block_size = 16 * 1024 * 1024; // s3fs block size is 16MB @@ -4044,8 +4043,8 @@ static void* s3fs_init(struct fuse_conn_info* conn) S3FS_PRN_DBG("Could not initialize cache directory."); } - if(!ThreadPoolMan::Initialize(max_thread_count)){ - S3FS_PRN_CRIT("Could not create thread pool(%d)", max_thread_count); + if(!ThreadPoolMan::Initialize()){ + S3FS_PRN_CRIT("Could not create thread pool(%d)", ThreadPoolMan::GetWorkerCount()); s3fs_exit_fuseloop(EXIT_FAILURE); } @@ -4691,11 +4690,6 @@ static int my_fuse_opt_proc(void* data, const char* arg, int key, struct fuse_ar is_remove_cache = true; return 0; } - else if(is_prefix(arg, "multireq_max=")){ - int maxreq = static_cast(cvt_strtoofft(strchr(arg, '=') + sizeof(char), /*base=*/ 10)); - S3fsCurl::SetMaxMultiRequest(maxreq); - return 0; - } else if(0 == strcmp(arg, "nonempty")){ nonempty = true; return 1; // need to continue for fuse. @@ -4941,13 +4935,15 @@ static int my_fuse_opt_proc(void* data, const char* arg, int key, struct fuse_ar S3fsCurlShare::SetSslSessionCache(false); return 0; } - else if(is_prefix(arg, "parallel_count=") || is_prefix(arg, "parallel_upload=")){ - int maxpara = static_cast(cvt_strtoofft(strchr(arg, '=') + sizeof(char), /*base=*/ 10)); - if(0 >= maxpara){ - S3FS_PRN_EXIT("argument should be over 1: parallel_count"); + else if(is_prefix(arg, "multireq_max=") || is_prefix(arg, "parallel_count=") || is_prefix(arg, "parallel_upload=")){ + S3FS_PRN_WARN("The multireq_max, parallel_count and parallel_upload options have been deprecated and merged with max_thread_count. In the near future, these options will no longer be available. For compatibility, the values you specify for these options will be treated as max_thread_count."); + + int max_thcount = static_cast(cvt_strtoofft(strchr(arg, '=') + sizeof(char), /*base=*/ 10)); + if(0 >= max_thcount){ + S3FS_PRN_EXIT("argument should be over 1: max_thread_count"); return -1; } - S3fsCurl::SetMaxParallelCount(maxpara); + ThreadPoolMan::SetWorkerCount(max_thcount); return 0; } else if(is_prefix(arg, "max_thread_count=")){ @@ -4956,8 +4952,7 @@ static int my_fuse_opt_proc(void* data, const char* arg, int key, struct fuse_ar S3FS_PRN_EXIT("argument should be over 1: max_thread_count"); return -1; } - max_thread_count = max_thcount; - S3FS_PRN_WARN("The max_thread_count option is not a formal option. Please note that it will change in the future."); + ThreadPoolMan::SetWorkerCount(max_thcount); return 0; } else if(is_prefix(arg, "fd_page_size=")){ @@ -5610,12 +5605,12 @@ int main(int argc, char* argv[]) } // Check free disk space for maultipart request - if(!FdManager::IsSafeDiskSpace(nullptr, S3fsCurl::GetMultipartSize() * S3fsCurl::GetMaxParallelCount())){ + if(!FdManager::IsSafeDiskSpace(nullptr, S3fsCurl::GetMultipartSize() * ThreadPoolMan::GetWorkerCount())){ // Try to clean cache dir and retry S3FS_PRN_WARN("No enough disk space for s3fs, try to clean cache dir"); FdManager::get()->CleanupCacheDir(); - if(!FdManager::IsSafeDiskSpace(nullptr, S3fsCurl::GetMultipartSize() * S3fsCurl::GetMaxParallelCount(), true)){ + if(!FdManager::IsSafeDiskSpace(nullptr, S3fsCurl::GetMultipartSize() * ThreadPoolMan::GetWorkerCount(), true)){ S3fsCurl::DestroyS3fsCurl(); s3fs_destroy_global_ssl(); exit(EXIT_FAILURE); diff --git a/src/s3fs_help.cpp b/src/s3fs_help.cpp index 46ca481..c377f04 100644 --- a/src/s3fs_help.cpp +++ b/src/s3fs_help.cpp @@ -239,17 +239,6 @@ static constexpr char help_string[] = " - s3fs is always using SSL session cache, this option make SSL \n" " session cache disable.\n" "\n" - " multireq_max (default=\"20\")\n" - " - maximum number of parallel request for listing objects.\n" - "\n" - " parallel_count (default=\"5\")\n" - " - number of parallel request for uploading big objects.\n" - " s3fs uploads large object (over 20MB) by multipart upload request, \n" - " and sends parallel requests.\n" - " This option limits parallel request count which s3fs requests \n" - " at once. It is necessary to set this value depending on a CPU \n" - " and a network band.\n" - "\n" " multipart_size (default=\"10\")\n" " - part size, in MB, for each multipart request.\n" " The minimum value is 5 MB and the maximum value is 5 GB.\n" @@ -357,11 +346,12 @@ static constexpr char help_string[] = " Note that this option is still experimental and may change in the\n" " future.\n" "\n" - " max_thread_count (default is \"5\")\n" - " - Specifies the number of threads waiting for stream uploads.\n" - " Note that this option and Stream Upload are still experimental\n" - " and subject to change in the future.\n" - " This option will be merged with \"parallel_count\" in the future.\n" + " max_thread_count (default is \"10\")\n" + " - This value is the maximum number of parallel requests to be\n" + " sent, and the number of parallel processes for head requests,\n" + " multipart uploads and stream uploads.\n" + " Worker threads will be started to process requests according to\n" + " this value.\n" "\n" " enable_content_md5 (default is disable)\n" " - Allow S3 server to check data integrity of uploads via the\n" diff --git a/src/threadpoolman.cpp b/src/threadpoolman.cpp index 6c36db5..653bf05 100644 --- a/src/threadpoolman.cpp +++ b/src/threadpoolman.cpp @@ -34,6 +34,7 @@ //------------------------------------------------ // ThreadPoolMan class variables //------------------------------------------------ +int ThreadPoolMan::worker_count = 10; // default std::unique_ptr ThreadPoolMan::singleton; //------------------------------------------------ @@ -45,7 +46,11 @@ bool ThreadPoolMan::Initialize(int count) S3FS_PRN_CRIT("Already singleton for Thread Manager exists."); abort(); } - ThreadPoolMan::singleton.reset(new ThreadPoolMan(count)); + if(-1 != count){ + ThreadPoolMan::SetWorkerCount(count); + } + ThreadPoolMan::singleton.reset(new ThreadPoolMan(ThreadPoolMan::worker_count)); + return true; } @@ -54,6 +59,25 @@ void ThreadPoolMan::Destroy() ThreadPoolMan::singleton.reset(); } +int ThreadPoolMan::SetWorkerCount(int count) +{ + if(0 >= count){ + S3FS_PRN_ERR("Thread worker count(%d) must be positive number.", count); + return -1; + } + if(count == ThreadPoolMan::worker_count){ + return ThreadPoolMan::worker_count; + } + + // [TODO] + // If we need to dynamically change worker threads, this is + // where we would terminate/add workers. + // + int old = ThreadPoolMan::worker_count; + ThreadPoolMan::worker_count = count; + return old; +} + bool ThreadPoolMan::Instruct(const thpoolman_param& param) { if(!ThreadPoolMan::singleton){ diff --git a/src/threadpoolman.h b/src/threadpoolman.h index 7704b5d..228f4cb 100644 --- a/src/threadpoolman.h +++ b/src/threadpoolman.h @@ -63,6 +63,7 @@ typedef std::list thpoolman_params_t; class ThreadPoolMan { private: + static int worker_count; static std::unique_ptr singleton; std::atomic is_exit; @@ -91,8 +92,10 @@ class ThreadPoolMan ThreadPoolMan& operator=(const ThreadPoolMan&) = delete; ThreadPoolMan& operator=(ThreadPoolMan&&) = delete; - static bool Initialize(int count); + static bool Initialize(int count = -1); static void Destroy(); + static int SetWorkerCount(int count); + static int GetWorkerCount() { return ThreadPoolMan::worker_count; } static bool Instruct(const thpoolman_param& pparam); static bool AwaitInstruct(const thpoolman_param& param); };