1
0
forked from Alepha/Alepha

Interlock for mailbox.

This commit is contained in:
2023-12-12 17:37:03 -05:00
parent bf207244b6
commit 94596d0d7d
3 changed files with 146 additions and 16 deletions

View File

@ -87,8 +87,7 @@ namespace Alepha::Hydrogen::Atomic ::detail:: Mailbox_m
const std::size_t boxLimit;
Alepha::mutex access;
Alepha::condition_variable ready;
bool suspended= false;
Alepha::Interlock interlockPoint;
std::size_t weight= 0;
std::vector< Item > preparing;
@ -104,25 +103,26 @@ namespace Alepha::Hydrogen::Atomic ::detail:: Mailbox_m
void
awaken( Lock &lock )
{
filled.clear();
weight= 0;
using std::swap;
swap( filled, preparing );
pos= begin( filled );
preparing.clear();
suspended= false;
weight= 0;
ready.notify_one();
}
template< typename Lock >
void
interlock( Lock &lock )
interlock( Alepha::unique_lock< Alepha::mutex > &lock )
{
if( C::debugInterlock ) error() << "Interlock entered." << std::endl;
if( suspended ) awaken( lock );
else
{
suspended= true;
while( suspended ) ready.wait( lock );
}
interlockPoint.checkpoint( lock, [&]{ awaken( lock ); } );
}
void
checkpoint( Alepha::unique_lock< Alepha::mutex > &lock )
{
if( C::debugInterlock ) error() << "Checkpoint entered." << std::endl;
interlockPoint.checkpoint( lock, [&]{ awaken( lock ); } );
}
[[noreturn]] void
@ -222,8 +222,7 @@ namespace Alepha::Hydrogen::Atomic ::detail:: Mailbox_m
Alepha::unique_lock lock( access );
assertion( not finished );
finished= true;
if( suspended ) interlock( lock );
else suspended= true;
checkpoint( lock );
}
};
}

View File

@ -91,6 +91,92 @@ namespace Alepha::Hydrogen::detail::Thread_m
state()->interruption_point();
}
namespace
{
enum State
{
Open,
Waiting,
Synchronizing,
Opening,
};
}
struct Interlock::Impl
{
State state= Open;
ConditionVariable var;
};
Interlock::Interlock() : pimpl( std::make_unique< Impl >() ) {}
Interlock::~Interlock()= default;
bool
Interlock::waiter( unique_lock< mutex > &lock )
{
// Defensively, if a thread called `checkpoint`, we
// might encounter that thread here, and need
// that thread to terminate the previous interlock
// phase.
while( impl().state == Opening )
{
impl().var.wait( lock );
}
if( impl().state == Open )
{
impl().state= Waiting;
return false;
}
else if( impl().state == Waiting )
{
impl().state= Synchronizing;
return true;
}
else { abort(); }
}
void
Interlock::noblock( unique_lock< mutex > &lock )
{
if( impl().state == Waiting )
{
// Nolock form will just leave the bit set.
}
else if( impl().state == Synchronizing )
{
impl().state= Opening;
impl().var.notify_one();
}
else { abort(); }
}
void
Interlock::interlock( unique_lock< mutex > &lock )
{
if( impl().state == Waiting )
{
while( impl().state == Waiting )
{
impl().var.wait( lock );
}
impl().state= Open;
impl().var.notify_one();
}
else if( impl().state == Synchronizing )
{
impl().state= Opening;
impl().var.notify_one();
while( impl().state == Opening )
{
impl().var.wait( lock );
}
}
else{ abort(); }
}
static auto init= Utility::enroll <=[]
{
state()= std::make_shared< ThreadState >();

View File

@ -27,11 +27,56 @@ namespace Alepha::Hydrogen ::detail:: Thread_m
class ConditionVariable;
using condition_variable= ConditionVariable;
/*!
* An `Interlock` is kind of like a condition variable, for two cooperating threads.
*
* An `Interlock` object represents the state which permits two cooperating threads to
* perform a synchronization action. Each side enters `Interlock::interlock`, and
* when both of them have entered, the synchronization function will be executed.
*
* In some sense it is similar to a `std::barrier` with a capacity to be interrupted
* and accepts parameter to pass as the interlock-synchronization action.
*/
class Interlock;
using std::mutex;
using std::lock_guard;
using std::unique_lock;
}
class exports::Interlock
{
private:
struct Impl;
std::unique_ptr< Impl > pimpl;
auto &impl() { return *pimpl; }
const auto &impl() const { return *pimpl; }
bool waiter( unique_lock< mutex > & );
void interlock( unique_lock< mutex > &lock );
void noblock( unique_lock< mutex > &lock );
public:
~Interlock();
Interlock();
void
wait( unique_lock< mutex > &lock, auto &&synchronize )
{
if( waiter( lock ) ) synchronize();
interlock( lock );
}
void
checkpoint( unique_lock< mutex > &lock, auto &&synchronize )
{
if( waiter( lock ) ) synchronize();
noblock( lock );
}
};
class exports::ConditionVariable
{
private: