Convert pthread_t to C++11 std::thread (#2481)

This has better cross-platform support and stronger type-safety.
This commit is contained in:
Andrew Gaul 2024-07-11 19:07:25 +05:30 committed by GitHub
parent 50d5a73f84
commit 437bf7ec95
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
10 changed files with 66 additions and 98 deletions

View File

@ -27,6 +27,7 @@
#include <memory>
#include <mutex>
#include <string>
#include <thread>
#include <vector>
#include "common.h"
@ -202,7 +203,7 @@ class S3fsCurl
std::string query_string; // request query string
Semaphore *sem;
std::mutex *completed_tids_lock;
std::vector<pthread_t> *completed_tids;
std::vector<std::thread::id> *completed_tids;
s3fscurl_lazy_setup fpLazySetup; // curl options for lazy setting function
CURLcode curlCode; // handle curl return

View File

@ -21,6 +21,8 @@
#include <cstdio>
#include <cstdlib>
#include <cerrno>
#include <future>
#include <thread>
#include <vector>
#include "s3fs.h"
@ -112,14 +114,12 @@ bool S3fsMultiCurl::SetS3fsCurlObject(std::unique_ptr<S3fsCurl> s3fscurl)
int S3fsMultiCurl::MultiPerform()
{
std::vector<pthread_t> threads;
std::map<std::thread::id, std::pair<std::thread, std::future<int>>> threads;
bool success = true;
bool isMultiHead = false;
Semaphore sem(GetMaxParallelism());
int rc;
for(s3fscurllist_t::iterator iter = clist_req.begin(); iter != clist_req.end(); ++iter) {
pthread_t thread;
S3fsCurl* s3fscurl = iter->get();
if(!s3fscurl){
continue;
@ -129,19 +129,14 @@ int S3fsMultiCurl::MultiPerform()
{
const std::lock_guard<std::mutex> lock(completed_tids_lock);
for(std::vector<pthread_t>::iterator it = completed_tids.begin(); it != completed_tids.end(); ++it){
void* retval;
rc = pthread_join(*it, &retval);
if (rc) {
success = false;
S3FS_PRN_ERR("failed pthread_join - rc(%d) %s", rc, strerror(rc));
} else {
long int_retval = reinterpret_cast<long>(retval);
if (int_retval && !(int_retval == -ENOENT && isMultiHead)) {
S3FS_PRN_WARN("thread terminated with non-zero return code: %ld", int_retval);
}
for(const auto &thread_id : completed_tids){
auto it = threads.find(thread_id);
it->second.first.join();
long int int_retval = it->second.second.get();
if (int_retval && !(int_retval == -ENOENT && isMultiHead)) {
S3FS_PRN_WARN("thread terminated with non-zero return code: %ld", int_retval);
}
threads.erase(it);
}
completed_tids.clear();
}
@ -151,13 +146,11 @@ int S3fsMultiCurl::MultiPerform()
isMultiHead |= s3fscurl->GetOp() == "HEAD";
rc = pthread_create(&thread, nullptr, S3fsMultiCurl::RequestPerformWrapper, static_cast<void*>(s3fscurl));
if (rc != 0) {
success = false;
S3FS_PRN_ERR("failed pthread_create - rc(%d)", rc);
break;
}
threads.push_back(thread);
std::promise<int> promise;
std::future<int> future = promise.get_future();
std::thread thread(S3fsMultiCurl::RequestPerformWrapper, s3fscurl, std::move(promise));
auto thread_id = thread.get_id();
threads.emplace(std::piecewise_construct, std::forward_as_tuple(thread_id), std::forward_as_tuple(std::move(thread), std::move(future)));
}
for(int i = 0; i < sem.get_value(); ++i){
@ -165,19 +158,14 @@ int S3fsMultiCurl::MultiPerform()
}
const std::lock_guard<std::mutex> lock(completed_tids_lock);
for (std::vector<pthread_t>::iterator titer = completed_tids.begin(); titer != completed_tids.end(); ++titer) {
void* retval;
rc = pthread_join(*titer, &retval);
if (rc) {
success = false;
S3FS_PRN_ERR("failed pthread_join - rc(%d)", rc);
} else {
long int_retval = reinterpret_cast<long>(retval);
if (int_retval && !(int_retval == -ENOENT && isMultiHead)) {
S3FS_PRN_WARN("thread terminated with non-zero return code: %ld", int_retval);
}
for(const auto &thread_id : completed_tids){
auto it = threads.find(thread_id);
it->second.first.join();
long int int_retval = it->second.second.get();
if (int_retval && !(int_retval == -ENOENT && isMultiHead)) {
S3FS_PRN_WARN("thread terminated with non-zero return code: %ld", int_retval);
}
threads.erase(it);
}
completed_tids.clear();
@ -343,30 +331,31 @@ int S3fsMultiCurl::Request()
//
// thread function for performing an S3fsCurl request
//
void* S3fsMultiCurl::RequestPerformWrapper(void* arg)
void S3fsMultiCurl::RequestPerformWrapper(S3fsCurl* s3fscurl, std::promise<int> promise)
{
S3fsCurl* s3fscurl= static_cast<S3fsCurl*>(arg);
void* result = nullptr;
int result = 0;
if(!s3fscurl){
return reinterpret_cast<void*>(static_cast<intptr_t>(-EIO));
// this doesn't signal completion but also never happens
promise.set_value(-EIO);
return;
}
if(s3fscurl->fpLazySetup){
if(!s3fscurl->fpLazySetup(s3fscurl)){
S3FS_PRN_ERR("Failed to lazy setup, then respond EIO.");
result = reinterpret_cast<void*>(static_cast<intptr_t>(-EIO));
result = -EIO;
}
}
if(!result){
result = reinterpret_cast<void*>(static_cast<intptr_t>(s3fscurl->RequestPerform()));
result = s3fscurl->RequestPerform();
s3fscurl->DestroyCurlHandle(true, false);
}
const std::lock_guard<std::mutex> lock(*s3fscurl->completed_tids_lock);
s3fscurl->completed_tids->push_back(pthread_self());
s3fscurl->completed_tids->push_back(std::this_thread::get_id());
s3fscurl->sem->post();
return result;
promise.set_value(result);
}
/*

View File

@ -21,8 +21,10 @@
#ifndef S3FS_CURL_MULTI_H_
#define S3FS_CURL_MULTI_H_
#include <future>
#include <memory>
#include <mutex>
#include <thread>
#include <vector>
//----------------------------------------------
@ -54,14 +56,14 @@ class S3fsMultiCurl
void* pNotFoundCallbackParam;
std::mutex completed_tids_lock;
std::vector<pthread_t> completed_tids;
std::vector<std::thread::id> completed_tids;
private:
bool ClearEx(bool is_all);
int MultiPerform();
int MultiRead();
static void* RequestPerformWrapper(void* arg);
static void RequestPerformWrapper(S3fsCurl* s3fscurl, std::promise<int> promise);
public:
explicit S3fsMultiCurl(int maxParallelism, bool not_abort = false);

View File

@ -22,7 +22,6 @@
#include <cstdio>
#include <cstdlib>
#include <cstring>
#include <pthread.h>
#include <unistd.h>
#include <syslog.h>
#include <sys/types.h>

View File

@ -22,7 +22,6 @@
#include <cstdio>
#include <cstdlib>
#include <cstring>
#include <pthread.h>
#include <unistd.h>
#include <syslog.h>
#include <sys/types.h>

View File

@ -26,7 +26,6 @@
#include <cstdlib>
#include <cerrno>
#include <mutex>
#include <pthread.h>
#include <unistd.h>
#include <sys/stat.h>
#include <openssl/evp.h>
@ -34,6 +33,7 @@
#include <openssl/md5.h>
#include <openssl/crypto.h>
#include <openssl/err.h>
#include <thread>
#include "s3fs_auth.h"
#include "s3fs_logger.h"
@ -99,9 +99,7 @@ static void s3fs_crypt_mutex_lock(int mode, int pos, const char* file, int line)
static unsigned long s3fs_crypt_get_threadid() __attribute__ ((unused));
static unsigned long s3fs_crypt_get_threadid()
{
// For FreeBSD etc, some system's pthread_t is structure pointer.
// Then we use cast like C style(not C++) instead of ifdef.
return (unsigned long)(pthread_self());
return static_cast<unsigned long>(std::hash<std::thread::id>()(std::this_thread::get_id()));
}
static struct CRYPTO_dynlock_value* s3fs_dyn_crypt_mutex(const char* file, int line) __attribute__ ((unused));

View File

@ -20,7 +20,7 @@
#include <cstdio>
#include <csignal>
#include <pthread.h>
#include <thread>
#include "s3fs_logger.h"
#include "sighandlers.h"
@ -86,14 +86,13 @@ bool S3fsSignals::SetUsr1Handler(const char* path)
return true;
}
void* S3fsSignals::CheckCacheWorker(void* arg)
void S3fsSignals::CheckCacheWorker(Semaphore* pSem)
{
Semaphore* pSem = static_cast<Semaphore*>(arg);
if(!pSem){
pthread_exit(nullptr);
return;
}
if(!S3fsSignals::enableUsr1){
pthread_exit(nullptr);
return;
}
// wait and loop
@ -117,7 +116,6 @@ void* S3fsSignals::CheckCacheWorker(void* arg)
pSem->wait();
}
}
return nullptr;
}
void S3fsSignals::HandlerUSR2(int sig)
@ -195,15 +193,9 @@ bool S3fsSignals::InitUsr1Handler()
}
// create thread
int result;
std::unique_ptr<Semaphore> pSemUsr1_tmp(new Semaphore(0));
std::unique_ptr<pthread_t> pThreadUsr1_tmp(new pthread_t);
if(0 != (result = pthread_create(pThreadUsr1.get(), nullptr, S3fsSignals::CheckCacheWorker, static_cast<void*>(pSemUsr1_tmp.get())))){
S3FS_PRN_ERR("Could not create thread for SIGUSR1 by %d", result);
return false;
}
pThreadUsr1.reset(new std::thread(S3fsSignals::CheckCacheWorker, pSemUsr1_tmp.get()));
pSemUsr1 = std::move(pSemUsr1_tmp);
pThreadUsr1 = std::move(pThreadUsr1_tmp);
// set handler
struct sigaction sa{};
@ -230,12 +222,7 @@ bool S3fsSignals::DestroyUsr1Handler()
pSemUsr1->post();
// wait for thread exiting
void* retval = nullptr;
int result;
if(0 != (result = pthread_join(*pThreadUsr1, &retval))){
S3FS_PRN_ERR("Could not stop thread for SIGUSR1 by %d", result);
return false;
}
pThreadUsr1->join();
pSemUsr1.reset();
pThreadUsr1.reset();

View File

@ -22,6 +22,7 @@
#define S3FS_SIGHANDLERS_H_
#include <memory>
#include <thread>
class Semaphore;
@ -34,14 +35,14 @@ class S3fsSignals
static std::unique_ptr<S3fsSignals> pSingleton;
static bool enableUsr1;
std::unique_ptr<pthread_t> pThreadUsr1;
std::unique_ptr<std::thread> pThreadUsr1;
std::unique_ptr<Semaphore> pSemUsr1;
protected:
static S3fsSignals* get() { return pSingleton.get(); }
static void HandlerUSR1(int sig);
static void* CheckCacheWorker(void* arg);
static void CheckCacheWorker(Semaphore* pSem);
static void HandlerUSR2(int sig);
static bool InitUsr2Handler();

View File

@ -21,6 +21,8 @@
#include <cerrno>
#include <cstdio>
#include <cstdlib>
#include <future>
#include <thread>
#include "s3fs_logger.h"
#include "threadpoolman.h"
@ -64,13 +66,12 @@ bool ThreadPoolMan::Instruct(const thpoolman_param& param)
//
// Thread worker
//
void* ThreadPoolMan::Worker(void* arg)
void ThreadPoolMan::Worker(ThreadPoolMan* psingleton, std::promise<int> promise)
{
ThreadPoolMan* psingleton = static_cast<ThreadPoolMan*>(arg);
if(!psingleton){
S3FS_PRN_ERR("The parameter for worker thread is invalid.");
return reinterpret_cast<void*>(-EIO);
promise.set_value(-EIO);
return;
}
S3FS_PRN_INFO3("Start worker thread in ThreadPoolMan.");
@ -105,7 +106,7 @@ void* ThreadPoolMan::Worker(void* arg)
}
}
return nullptr;
promise.set_value(0);
}
//------------------------------------------------
@ -158,14 +159,10 @@ bool ThreadPoolMan::StopThreads()
}
// wait for threads exiting
for(thread_list_t::const_iterator iter = thread_list.begin(); iter != thread_list.end(); ++iter){
void* retval = nullptr;
int result = pthread_join(*iter, &retval);
if(result){
S3FS_PRN_ERR("failed pthread_join - result(%d)", result);
}else{
S3FS_PRN_DBG("succeed pthread_join - return code(%ld)", reinterpret_cast<long>(retval));
}
for(auto& pair : thread_list){
pair.first.join();
long retval = pair.second.get();
S3FS_PRN_DBG("join succeeded - return code(%ld)", reinterpret_cast<long>(retval));
}
thread_list.clear();
@ -195,14 +192,10 @@ bool ThreadPoolMan::StartThreads(int count)
SetExitFlag(false);
for(int cnt = 0; cnt < count; ++cnt){
// run thread
pthread_t thread;
int result;
if(0 != (result = pthread_create(&thread, nullptr, ThreadPoolMan::Worker, static_cast<void*>(this)))){
S3FS_PRN_ERR("failed pthread_create with return code(%d)", result);
StopThreads(); // if possible, stop all threads
return false;
}
thread_list.push_back(thread);
std::promise<int> promise;
std::future<int> future = promise.get_future();
std::thread thread(ThreadPoolMan::Worker, this, std::move(promise));
thread_list.emplace_back(std::move(thread), std::move(future));
}
return true;
}

View File

@ -22,6 +22,7 @@
#define S3FS_THREADPOOLMAN_H_
#include <atomic>
#include <future>
#include <list>
#include <mutex>
#include <vector>
@ -34,7 +35,7 @@
//
// Prototype function
//
typedef void* (*thpoolman_worker)(void*); // same as start_routine for pthread_create function
typedef void* (*thpoolman_worker)(void*);
//
// Parameter structure
@ -53,8 +54,6 @@ struct thpoolman_param
typedef std::list<thpoolman_param> thpoolman_params_t;
typedef std::vector<pthread_t> thread_list_t;
//------------------------------------------------
// Class ThreadPoolMan
//------------------------------------------------
@ -67,12 +66,12 @@ class ThreadPoolMan
Semaphore thpoolman_sem;
std::mutex thread_list_lock;
thread_list_t thread_list;
std::vector<std::pair<std::thread, std::future<int>>> thread_list;
thpoolman_params_t instruction_list;
private:
static void* Worker(void* arg);
static void Worker(ThreadPoolMan* psingleton, std::promise<int> promise);
explicit ThreadPoolMan(int count = 1);
~ThreadPoolMan();