From c04fc7be3c669cbca2eb1ad99e2c4c2ca157e114 Mon Sep 17 00:00:00 2001 From: ADAM David Alan Martin Date: Tue, 12 Dec 2023 23:56:17 -0500 Subject: [PATCH] Non-blocking states and documentation for `Interlock`. --- Atomic/Mailbox.h | 2 +- Thread.cc | 32 ++++++++++++++++++++++++++++---- Thread.h | 32 ++++++++++++++++++++++++++++++++ 3 files changed, 61 insertions(+), 5 deletions(-) diff --git a/Atomic/Mailbox.h b/Atomic/Mailbox.h index e5544a9..c21ae04 100644 --- a/Atomic/Mailbox.h +++ b/Atomic/Mailbox.h @@ -115,7 +115,7 @@ namespace Alepha::Hydrogen::Atomic ::detail:: Mailbox_m interlock( Alepha::unique_lock< Alepha::mutex > &lock ) { if( C::debugInterlock ) error() << "Interlock entered." << std::endl; - interlockPoint.checkpoint( lock, [&]{ awaken( lock ); } ); + interlockPoint.wait( lock, [&]{ awaken( lock ); } ); } void diff --git a/Thread.cc b/Thread.cc index edb31f2..82fcb1e 100644 --- a/Thread.cc +++ b/Thread.cc @@ -9,10 +9,18 @@ static_assert( __cplusplus > 2020'99 ); #include #include +#include "error.h" + namespace Alepha::Hydrogen::detail::Thread_m { namespace { + namespace C + { + const bool debug= false; + const bool debugWaiters= false or C::debug; + } + struct ThreadState { std::mutex access; @@ -105,6 +113,7 @@ namespace Alepha::Hydrogen::detail::Thread_m struct Interlock::Impl { State state= Open; + bool threadBlocked= false; ConditionVariable var; }; @@ -143,6 +152,7 @@ namespace Alepha::Hydrogen::detail::Thread_m if( impl().state == Waiting ) { // Nolock form will just leave the bit set. + assert( not impl().threadBlocked ); } else if( impl().state == Synchronizing ) { @@ -157,21 +167,35 @@ namespace Alepha::Hydrogen::detail::Thread_m { if( impl().state == Waiting ) { + impl().threadBlocked= true; while( impl().state == Waiting ) { impl().var.wait( lock ); } + assert( not impl().threadBlocked ); impl().state= Open; impl().var.notify_one(); } else if( impl().state == Synchronizing ) { impl().state= Opening; - impl().var.notify_one(); - - while( impl().state == Opening ) + const bool unblock= impl().threadBlocked; + impl().threadBlocked= false; + if( unblock ) { - impl().var.wait( lock ); + impl().var.notify_one(); + + while( impl().state == Opening ) + { + impl().var.wait( lock ); + } + } + else + { + if( C::debugWaiters ) + { + error() << "Not waiting on other side, as nobody's there." << std::endl; + } } } else{ abort(); } diff --git a/Thread.h b/Thread.h index 95e018d..072a42d 100644 --- a/Thread.h +++ b/Thread.h @@ -62,6 +62,22 @@ namespace Alepha::Hydrogen ::detail:: Thread_m ~Interlock(); Interlock(); + /*! + * Wait on interlock condition. + * + * Blocks this thread until another thread also rendezvous + * with this `Interlock`. If there already is another thread + * waiting, then that thread will be reawakened after the + * `synchronize` step. + * + * @param lock A lock on a mutex which governs exclusive + * access to the shared state among these threads. + * + * @param synchronize The action to take which + * resynchronizes the state among these threads. + * + * @note This operation is an `interruption_point` + */ void wait( unique_lock< mutex > &lock, auto &&synchronize ) { @@ -69,6 +85,22 @@ namespace Alepha::Hydrogen ::detail:: Thread_m interlock( lock ); } + /*! + * Notify but don't Wait on interlock condition. + * + * Marks that a thread has reached another this rendezvous + * with this `Interlock`. If there already is another thread + * waiting, then that thread will be reawakened after the + * `synchronize` step. + * + * @param lock A lock on a mutex which governs exclusive + * access to the shared state among these threads. + * + * @param synchronize The action to take which + * resynchronizes the state among these threads. + * + * @note This operation is not an `interruption_point` + */ void checkpoint( unique_lock< mutex > &lock, auto &&synchronize ) {