From 94596d0d7d17666417de3c7ffe09e1e6c6506841 Mon Sep 17 00:00:00 2001 From: ADAM David Alan Martin Date: Tue, 12 Dec 2023 17:37:03 -0500 Subject: [PATCH] Interlock for mailbox. --- Atomic/Mailbox.h | 31 +++++++++-------- Thread.cc | 86 ++++++++++++++++++++++++++++++++++++++++++++++++ Thread.h | 45 +++++++++++++++++++++++++ 3 files changed, 146 insertions(+), 16 deletions(-) diff --git a/Atomic/Mailbox.h b/Atomic/Mailbox.h index 3090c20..e5544a9 100644 --- a/Atomic/Mailbox.h +++ b/Atomic/Mailbox.h @@ -87,8 +87,7 @@ namespace Alepha::Hydrogen::Atomic ::detail:: Mailbox_m const std::size_t boxLimit; Alepha::mutex access; - Alepha::condition_variable ready; - bool suspended= false; + Alepha::Interlock interlockPoint; std::size_t weight= 0; std::vector< Item > preparing; @@ -104,25 +103,26 @@ namespace Alepha::Hydrogen::Atomic ::detail:: Mailbox_m void awaken( Lock &lock ) { + filled.clear(); + weight= 0; + + using std::swap; swap( filled, preparing ); pos= begin( filled ); - preparing.clear(); - suspended= false; - weight= 0; - ready.notify_one(); } - template< typename Lock > void - interlock( Lock &lock ) + interlock( Alepha::unique_lock< Alepha::mutex > &lock ) { if( C::debugInterlock ) error() << "Interlock entered." << std::endl; - if( suspended ) awaken( lock ); - else - { - suspended= true; - while( suspended ) ready.wait( lock ); - } + interlockPoint.checkpoint( lock, [&]{ awaken( lock ); } ); + } + + void + checkpoint( Alepha::unique_lock< Alepha::mutex > &lock ) + { + if( C::debugInterlock ) error() << "Checkpoint entered." << std::endl; + interlockPoint.checkpoint( lock, [&]{ awaken( lock ); } ); } [[noreturn]] void @@ -222,8 +222,7 @@ namespace Alepha::Hydrogen::Atomic ::detail:: Mailbox_m Alepha::unique_lock lock( access ); assertion( not finished ); finished= true; - if( suspended ) interlock( lock ); - else suspended= true; + checkpoint( lock ); } }; } diff --git a/Thread.cc b/Thread.cc index 3c4e5ef..edb31f2 100644 --- a/Thread.cc +++ b/Thread.cc @@ -91,6 +91,92 @@ namespace Alepha::Hydrogen::detail::Thread_m state()->interruption_point(); } + namespace + { + enum State + { + Open, + Waiting, + Synchronizing, + Opening, + }; + } + + struct Interlock::Impl + { + State state= Open; + + ConditionVariable var; + }; + + Interlock::Interlock() : pimpl( std::make_unique< Impl >() ) {} + Interlock::~Interlock()= default; + + bool + Interlock::waiter( unique_lock< mutex > &lock ) + { + // Defensively, if a thread called `checkpoint`, we + // might encounter that thread here, and need + // that thread to terminate the previous interlock + // phase. + while( impl().state == Opening ) + { + impl().var.wait( lock ); + } + + if( impl().state == Open ) + { + impl().state= Waiting; + return false; + } + else if( impl().state == Waiting ) + { + impl().state= Synchronizing; + return true; + } + else { abort(); } + } + + void + Interlock::noblock( unique_lock< mutex > &lock ) + { + if( impl().state == Waiting ) + { + // Nolock form will just leave the bit set. + } + else if( impl().state == Synchronizing ) + { + impl().state= Opening; + impl().var.notify_one(); + } + else { abort(); } + } + + void + Interlock::interlock( unique_lock< mutex > &lock ) + { + if( impl().state == Waiting ) + { + while( impl().state == Waiting ) + { + impl().var.wait( lock ); + } + impl().state= Open; + impl().var.notify_one(); + } + else if( impl().state == Synchronizing ) + { + impl().state= Opening; + impl().var.notify_one(); + + while( impl().state == Opening ) + { + impl().var.wait( lock ); + } + } + else{ abort(); } + } + static auto init= Utility::enroll <=[] { state()= std::make_shared< ThreadState >(); diff --git a/Thread.h b/Thread.h index 19d35a3..95e018d 100644 --- a/Thread.h +++ b/Thread.h @@ -27,11 +27,56 @@ namespace Alepha::Hydrogen ::detail:: Thread_m class ConditionVariable; using condition_variable= ConditionVariable; + /*! + * An `Interlock` is kind of like a condition variable, for two cooperating threads. + * + * An `Interlock` object represents the state which permits two cooperating threads to + * perform a synchronization action. Each side enters `Interlock::interlock`, and + * when both of them have entered, the synchronization function will be executed. + * + * In some sense it is similar to a `std::barrier` with a capacity to be interrupted + * and accepts parameter to pass as the interlock-synchronization action. + */ + class Interlock; + using std::mutex; using std::lock_guard; using std::unique_lock; } + class exports::Interlock + { + private: + struct Impl; + std::unique_ptr< Impl > pimpl; + + auto &impl() { return *pimpl; } + const auto &impl() const { return *pimpl; } + + bool waiter( unique_lock< mutex > & ); + + void interlock( unique_lock< mutex > &lock ); + void noblock( unique_lock< mutex > &lock ); + + public: + ~Interlock(); + Interlock(); + + void + wait( unique_lock< mutex > &lock, auto &&synchronize ) + { + if( waiter( lock ) ) synchronize(); + interlock( lock ); + } + + void + checkpoint( unique_lock< mutex > &lock, auto &&synchronize ) + { + if( waiter( lock ) ) synchronize(); + noblock( lock ); + } + }; + class exports::ConditionVariable { private: