Make psemaphore similar to C++20 std::counting_semaphore (#2569)

This commit is contained in:
Andrew Gaul 2024-10-29 08:23:05 +09:00 committed by GitHub
parent 07881195f2
commit 3b226ed672
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
7 changed files with 37 additions and 30 deletions

View File

@ -34,6 +34,7 @@
#include "common.h" #include "common.h"
#include "fdcache_page.h" #include "fdcache_page.h"
#include "metaheader.h" #include "metaheader.h"
#include "psemaphore.h"
#include "s3fs_util.h" #include "s3fs_util.h"
#include "types.h" #include "types.h"
@ -86,7 +87,6 @@ typedef std::unique_ptr<CURL, decltype(&curl_easy_cleanup)> CurlUniquePtr;
class CurlHandlerPool; class CurlHandlerPool;
class S3fsCred; class S3fsCred;
class S3fsCurl; class S3fsCurl;
class Semaphore;
// Prototype function for lazy setup options for curl handle // Prototype function for lazy setup options for curl handle
typedef bool (*s3fscurl_lazy_setup)(S3fsCurl* s3fscurl); typedef bool (*s3fscurl_lazy_setup)(S3fsCurl* s3fscurl);

View File

@ -119,7 +119,8 @@ int S3fsMultiCurl::MultiPerform()
std::map<std::thread::id, std::pair<std::thread, std::future<int>>> threads; std::map<std::thread::id, std::pair<std::thread, std::future<int>>> threads;
int result = 0; int result = 0;
bool isMultiHead = false; bool isMultiHead = false;
Semaphore sem(GetMaxParallelism()); int semCount = GetMaxParallelism();
Semaphore sem(semCount);
for(auto iter = clist_req.cbegin(); iter != clist_req.cend(); ++iter) { for(auto iter = clist_req.cbegin(); iter != clist_req.cend(); ++iter) {
S3fsCurl* s3fscurl = iter->get(); S3fsCurl* s3fscurl = iter->get();
@ -127,7 +128,7 @@ int S3fsMultiCurl::MultiPerform()
continue; continue;
} }
sem.wait(); sem.acquire();
{ {
const std::lock_guard<std::mutex> lock(completed_tids_lock); const std::lock_guard<std::mutex> lock(completed_tids_lock);
@ -155,8 +156,8 @@ int S3fsMultiCurl::MultiPerform()
threads.emplace(std::piecewise_construct, std::forward_as_tuple(thread_id), std::forward_as_tuple(std::move(thread), std::move(future))); threads.emplace(std::piecewise_construct, std::forward_as_tuple(thread_id), std::forward_as_tuple(std::move(thread), std::move(future)));
} }
for(int i = 0; i < sem.get_value(); ++i){ for(int i = 0; i < semCount; ++i){
sem.wait(); sem.acquire();
} }
const std::lock_guard<std::mutex> lock(completed_tids_lock); const std::lock_guard<std::mutex> lock(completed_tids_lock);
@ -355,7 +356,7 @@ void S3fsMultiCurl::RequestPerformWrapper(S3fsCurl* s3fscurl, std::promise<int>
const std::lock_guard<std::mutex> lock(*s3fscurl->completed_tids_lock); const std::lock_guard<std::mutex> lock(*s3fscurl->completed_tids_lock);
s3fscurl->completed_tids->push_back(std::this_thread::get_id()); s3fscurl->completed_tids->push_back(std::this_thread::get_id());
s3fscurl->sem->post(); s3fscurl->sem->release();
promise.set_value(result); promise.set_value(result);
} }

View File

@ -646,7 +646,7 @@ int PseudoFdInfo::WaitAllThreadsExit()
while(is_loop){ while(is_loop){
// need to wait the worker exiting // need to wait the worker exiting
uploaded_sem.wait(); uploaded_sem.acquire();
{ {
const std::lock_guard<std::mutex> lock(upload_list_lock); const std::lock_guard<std::mutex> lock(upload_list_lock);
if(0 < completed_count){ if(0 < completed_count){

View File

@ -24,6 +24,13 @@
//------------------------------------------------------------------- //-------------------------------------------------------------------
// Class Semaphore // Class Semaphore
//------------------------------------------------------------------- //-------------------------------------------------------------------
#if __cplusplus >= 202002L
#include <semaphore>
typedef std::counting_semaphore<INT_MAX> Semaphore;
#else
// portability wrapper for sem_t since macOS does not implement it // portability wrapper for sem_t since macOS does not implement it
#ifdef __APPLE__ #ifdef __APPLE__
@ -36,8 +43,8 @@ class Semaphore
~Semaphore() ~Semaphore()
{ {
// macOS cannot destroy a semaphore with posts less than the initializer // macOS cannot destroy a semaphore with posts less than the initializer
for(int i = 0; i < get_value(); ++i){ for(int i = 0; i < value; ++i){
post(); release();
} }
dispatch_release(sem); dispatch_release(sem);
} }
@ -46,8 +53,8 @@ class Semaphore
Semaphore& operator=(const Semaphore&) = delete; Semaphore& operator=(const Semaphore&) = delete;
Semaphore& operator=(Semaphore&&) = delete; Semaphore& operator=(Semaphore&&) = delete;
void wait() { dispatch_semaphore_wait(sem, DISPATCH_TIME_FOREVER); } void acquire() { dispatch_semaphore_wait(sem, DISPATCH_TIME_FOREVER); }
bool try_wait() bool try_acquire()
{ {
if(0 == dispatch_semaphore_wait(sem, DISPATCH_TIME_NOW)){ if(0 == dispatch_semaphore_wait(sem, DISPATCH_TIME_NOW)){
return true; return true;
@ -55,8 +62,7 @@ class Semaphore
return false; return false;
} }
} }
void post() { dispatch_semaphore_signal(sem); } void release() { dispatch_semaphore_signal(sem); }
int get_value() const { return value; }
private: private:
int value; int value;
@ -71,16 +77,16 @@ class Semaphore
class Semaphore class Semaphore
{ {
public: public:
explicit Semaphore(int value) : value(value) { sem_init(&mutex, 0, value); } explicit Semaphore(int value) { sem_init(&mutex, 0, value); }
~Semaphore() { sem_destroy(&mutex); } ~Semaphore() { sem_destroy(&mutex); }
void wait() void acquire()
{ {
int r; int r;
do { do {
r = sem_wait(&mutex); r = sem_wait(&mutex);
} while (r == -1 && errno == EINTR); } while (r == -1 && errno == EINTR);
} }
bool try_wait() bool try_acquire()
{ {
int result; int result;
do{ do{
@ -89,16 +95,16 @@ class Semaphore
return (0 == result); return (0 == result);
} }
void post() { sem_post(&mutex); } void release() { sem_post(&mutex); }
int get_value() const { return value; }
private: private:
int value;
sem_t mutex; sem_t mutex;
}; };
#endif #endif
#endif
#endif // S3FS_SEMAPHORE_H_ #endif // S3FS_SEMAPHORE_H_
/* /*

View File

@ -100,12 +100,12 @@ void S3fsSignals::CheckCacheWorker(Semaphore* pSem)
// wait and loop // wait and loop
while(S3fsSignals::enableUsr1){ while(S3fsSignals::enableUsr1){
// wait // wait
pSem->wait(); pSem->acquire();
// cppcheck-suppress unmatchedSuppression // cppcheck-suppress unmatchedSuppression
// cppcheck-suppress knownConditionTrueFalse // cppcheck-suppress knownConditionTrueFalse
if(!S3fsSignals::enableUsr1){ if(!S3fsSignals::enableUsr1){
break; // assap break; // asap
} }
// check all cache // check all cache
@ -114,7 +114,7 @@ void S3fsSignals::CheckCacheWorker(Semaphore* pSem)
} }
// do not allow request queuing // do not allow request queuing
while(pSem->try_wait()); while(pSem->try_acquire());
} }
} }
@ -219,7 +219,7 @@ bool S3fsSignals::DestroyUsr1Handler()
S3fsSignals::enableUsr1 = false; S3fsSignals::enableUsr1 = false;
// wakeup thread // wakeup thread
pSemUsr1->post(); pSemUsr1->release();
// wait for thread exiting // wait for thread exiting
pThreadUsr1->join(); pThreadUsr1->join();
@ -235,7 +235,7 @@ bool S3fsSignals::WakeupUsr1Thread()
S3FS_PRN_ERR("The thread for SIGUSR1 is not setup."); S3FS_PRN_ERR("The thread for SIGUSR1 is not setup.");
return false; return false;
} }
pSemUsr1->post(); pSemUsr1->release();
return true; return true;
} }

View File

@ -24,7 +24,7 @@
#include <memory> #include <memory>
#include <thread> #include <thread>
class Semaphore; #include "psemaphore.h"
//---------------------------------------------- //----------------------------------------------
// class S3fsSignals // class S3fsSignals

View File

@ -76,7 +76,7 @@ void ThreadPoolMan::Worker(ThreadPoolMan* psingleton, std::promise<int> promise)
while(!psingleton->IsExit()){ while(!psingleton->IsExit()){
// wait // wait
psingleton->thpoolman_sem.wait(); psingleton->thpoolman_sem.acquire();
if(psingleton->IsExit()){ if(psingleton->IsExit()){
break; break;
@ -101,7 +101,7 @@ void ThreadPoolMan::Worker(ThreadPoolMan* psingleton, std::promise<int> promise)
S3FS_PRN_WARN("The instruction function returned with something error code(%ld).", reinterpret_cast<long>(retval)); S3FS_PRN_WARN("The instruction function returned with something error code(%ld).", reinterpret_cast<long>(retval));
} }
if(param.psem){ if(param.psem){
param.psem->post(); param.psem->release();
} }
} }
@ -156,7 +156,7 @@ bool ThreadPoolMan::StopThreads()
// all threads to exit // all threads to exit
SetExitFlag(true); SetExitFlag(true);
for(size_t waitcnt = thread_list.size(); 0 < waitcnt; --waitcnt){ for(size_t waitcnt = thread_list.size(); 0 < waitcnt; --waitcnt){
thpoolman_sem.post(); thpoolman_sem.release();
} }
// wait for threads exiting // wait for threads exiting
@ -168,7 +168,7 @@ bool ThreadPoolMan::StopThreads()
thread_list.clear(); thread_list.clear();
// reset semaphore(to zero) // reset semaphore(to zero)
while(thpoolman_sem.try_wait()){ while(thpoolman_sem.try_acquire()){
} }
return true; return true;
@ -212,7 +212,7 @@ void ThreadPoolMan::SetInstruction(const thpoolman_param& param)
} }
// run thread // run thread
thpoolman_sem.post(); thpoolman_sem.release();
} }
/* /*