1
0
forked from Alepha/Alepha

Non-blocking states and documentation for Interlock.

This commit is contained in:
2023-12-12 23:56:17 -05:00
parent 94596d0d7d
commit c04fc7be3c
3 changed files with 61 additions and 5 deletions

View File

@ -115,7 +115,7 @@ namespace Alepha::Hydrogen::Atomic ::detail:: Mailbox_m
interlock( Alepha::unique_lock< Alepha::mutex > &lock ) interlock( Alepha::unique_lock< Alepha::mutex > &lock )
{ {
if( C::debugInterlock ) error() << "Interlock entered." << std::endl; if( C::debugInterlock ) error() << "Interlock entered." << std::endl;
interlockPoint.checkpoint( lock, [&]{ awaken( lock ); } ); interlockPoint.wait( lock, [&]{ awaken( lock ); } );
} }
void void

View File

@ -9,10 +9,18 @@ static_assert( __cplusplus > 2020'99 );
#include <Alepha/Utility/StaticValue.h> #include <Alepha/Utility/StaticValue.h>
#include <Alepha/Utility/evaluation_helpers.h> #include <Alepha/Utility/evaluation_helpers.h>
#include "error.h"
namespace Alepha::Hydrogen::detail::Thread_m namespace Alepha::Hydrogen::detail::Thread_m
{ {
namespace namespace
{ {
namespace C
{
const bool debug= false;
const bool debugWaiters= false or C::debug;
}
struct ThreadState struct ThreadState
{ {
std::mutex access; std::mutex access;
@ -105,6 +113,7 @@ namespace Alepha::Hydrogen::detail::Thread_m
struct Interlock::Impl struct Interlock::Impl
{ {
State state= Open; State state= Open;
bool threadBlocked= false;
ConditionVariable var; ConditionVariable var;
}; };
@ -143,6 +152,7 @@ namespace Alepha::Hydrogen::detail::Thread_m
if( impl().state == Waiting ) if( impl().state == Waiting )
{ {
// Nolock form will just leave the bit set. // Nolock form will just leave the bit set.
assert( not impl().threadBlocked );
} }
else if( impl().state == Synchronizing ) else if( impl().state == Synchronizing )
{ {
@ -157,16 +167,22 @@ namespace Alepha::Hydrogen::detail::Thread_m
{ {
if( impl().state == Waiting ) if( impl().state == Waiting )
{ {
impl().threadBlocked= true;
while( impl().state == Waiting ) while( impl().state == Waiting )
{ {
impl().var.wait( lock ); impl().var.wait( lock );
} }
assert( not impl().threadBlocked );
impl().state= Open; impl().state= Open;
impl().var.notify_one(); impl().var.notify_one();
} }
else if( impl().state == Synchronizing ) else if( impl().state == Synchronizing )
{ {
impl().state= Opening; impl().state= Opening;
const bool unblock= impl().threadBlocked;
impl().threadBlocked= false;
if( unblock )
{
impl().var.notify_one(); impl().var.notify_one();
while( impl().state == Opening ) while( impl().state == Opening )
@ -174,6 +190,14 @@ namespace Alepha::Hydrogen::detail::Thread_m
impl().var.wait( lock ); impl().var.wait( lock );
} }
} }
else
{
if( C::debugWaiters )
{
error() << "Not waiting on other side, as nobody's there." << std::endl;
}
}
}
else{ abort(); } else{ abort(); }
} }

View File

@ -62,6 +62,22 @@ namespace Alepha::Hydrogen ::detail:: Thread_m
~Interlock(); ~Interlock();
Interlock(); Interlock();
/*!
* Wait on interlock condition.
*
* Blocks this thread until another thread also rendezvous
* with this `Interlock`. If there already is another thread
* waiting, then that thread will be reawakened after the
* `synchronize` step.
*
* @param lock A lock on a mutex which governs exclusive
* access to the shared state among these threads.
*
* @param synchronize The action to take which
* resynchronizes the state among these threads.
*
* @note This operation is an `interruption_point`
*/
void void
wait( unique_lock< mutex > &lock, auto &&synchronize ) wait( unique_lock< mutex > &lock, auto &&synchronize )
{ {
@ -69,6 +85,22 @@ namespace Alepha::Hydrogen ::detail:: Thread_m
interlock( lock ); interlock( lock );
} }
/*!
* Notify but don't Wait on interlock condition.
*
* Marks that a thread has reached another this rendezvous
* with this `Interlock`. If there already is another thread
* waiting, then that thread will be reawakened after the
* `synchronize` step.
*
* @param lock A lock on a mutex which governs exclusive
* access to the shared state among these threads.
*
* @param synchronize The action to take which
* resynchronizes the state among these threads.
*
* @note This operation is not an `interruption_point`
*/
void void
checkpoint( unique_lock< mutex > &lock, auto &&synchronize ) checkpoint( unique_lock< mutex > &lock, auto &&synchronize )
{ {