forked from Alepha/Alepha
247 lines
4.6 KiB
C++
247 lines
4.6 KiB
C++
static_assert( __cplusplus > 2020'99 );
|
|
|
|
#include "Thread.h"
|
|
|
|
#include <stop_token>
|
|
#include <condition_variable>
|
|
#include <thread>
|
|
|
|
#include <Alepha/Utility/StaticValue.h>
|
|
#include <Alepha/Utility/evaluation_helpers.h>
|
|
#include <Alepha/Utility/ScopedState.h>
|
|
|
|
#include "error.h"
|
|
|
|
namespace Alepha::Hydrogen ::detail:: Thread_m
|
|
{
|
|
namespace
|
|
{
|
|
namespace C
|
|
{
|
|
const bool debug= false;
|
|
const bool debugWaiters= false or C::debug;
|
|
}
|
|
|
|
struct ThreadState
|
|
{
|
|
std::mutex access;
|
|
std::stop_source source;
|
|
std::exception_ptr notification;
|
|
|
|
void
|
|
deliver( std::exception_ptr ptr )
|
|
{
|
|
std::lock_guard lock{ access };
|
|
if( notification ) return; // TODO: Don't swallow blocked interrupts?
|
|
notification= std::move( ptr );
|
|
source.request_stop();
|
|
}
|
|
|
|
void
|
|
interruption_point( std::lock_guard< std::mutex > & )
|
|
{
|
|
if( notification == nullptr ) return;
|
|
source= std::stop_source{};
|
|
auto next= std::move( notification );
|
|
notification= nullptr;
|
|
std::rethrow_exception( next );
|
|
}
|
|
|
|
void
|
|
interruption_point()
|
|
{
|
|
std::lock_guard lock( access );
|
|
interruption_point( lock );
|
|
}
|
|
|
|
std::stop_token
|
|
getToken()
|
|
{
|
|
std::lock_guard lock( access );
|
|
interruption_point( lock );
|
|
return source.get_token();
|
|
}
|
|
};
|
|
|
|
thread_local Utility::StaticValue< std::shared_ptr< ThreadState > > state;
|
|
}
|
|
|
|
struct ConditionVariable::Impl
|
|
{
|
|
std::condition_variable_any cond;
|
|
};
|
|
|
|
ConditionVariable::~ConditionVariable()= default;
|
|
|
|
ConditionVariable::ConditionVariable()
|
|
: pimpl( std::make_unique< Impl >() ) {}
|
|
|
|
void
|
|
ConditionVariable::notify_one()
|
|
{
|
|
impl().cond.notify_one();
|
|
}
|
|
|
|
void
|
|
ConditionVariable::notify_all()
|
|
{
|
|
impl().cond.notify_all();
|
|
}
|
|
|
|
void
|
|
ConditionVariable::wait( unique_lock< mutex > &lock )
|
|
{
|
|
auto stop= Utility::evaluate <=[]
|
|
{
|
|
std::lock_guard lock( state()->access );
|
|
return state()->source.get_token();
|
|
};
|
|
impl().cond.wait( lock, stop, [val= std::uint64_t()] mutable { return val++; } );
|
|
state()->interruption_point();
|
|
}
|
|
|
|
namespace
|
|
{
|
|
enum State
|
|
{
|
|
Open,
|
|
Waiting,
|
|
Synchronizing,
|
|
Opening,
|
|
};
|
|
}
|
|
|
|
struct Interlock::Impl
|
|
{
|
|
State state= Open;
|
|
bool threadBlocked= false;
|
|
|
|
ConditionVariable var;
|
|
};
|
|
|
|
Interlock::Interlock() : pimpl( std::make_unique< Impl >() ) {}
|
|
Interlock::~Interlock()= default;
|
|
|
|
bool
|
|
Interlock::waiter( unique_lock< mutex > &lock )
|
|
{
|
|
// Defensively, if a thread called `checkpoint`, we
|
|
// might encounter that thread here, and need
|
|
// that thread to terminate the previous interlock
|
|
// phase.
|
|
while( impl().state == Opening )
|
|
{
|
|
impl().var.wait( lock );
|
|
assert( not impl().threadBlocked );
|
|
}
|
|
|
|
if( impl().state == Open )
|
|
{
|
|
impl().state= Waiting;
|
|
return false;
|
|
}
|
|
else if( impl().state == Waiting )
|
|
{
|
|
impl().state= Synchronizing;
|
|
return true;
|
|
}
|
|
else { abort(); }
|
|
}
|
|
|
|
void
|
|
Interlock::noblock( unique_lock< mutex > &lock )
|
|
{
|
|
if( impl().state == Waiting )
|
|
{
|
|
// Nolock form will just leave the bit set.
|
|
assert( not impl().threadBlocked );
|
|
}
|
|
else if( impl().state == Synchronizing )
|
|
{
|
|
impl().state= Opening;
|
|
impl().var.notify_one();
|
|
}
|
|
else { abort(); }
|
|
}
|
|
|
|
void
|
|
Interlock::interlock( unique_lock< mutex > &lock )
|
|
{
|
|
if( impl().state == Waiting )
|
|
{
|
|
while( impl().state == Waiting )
|
|
{
|
|
Utility::ScopedState state{ impl().threadBlocked };
|
|
impl().var.wait( lock );
|
|
}
|
|
assert( not impl().threadBlocked );
|
|
impl().state= Open;
|
|
impl().var.notify_one();
|
|
}
|
|
else if( impl().state == Synchronizing )
|
|
{
|
|
impl().state= Opening;
|
|
if( impl().threadBlocked )
|
|
{
|
|
impl().var.notify_one();
|
|
|
|
while( impl().state == Opening )
|
|
{
|
|
impl().var.wait( lock );
|
|
}
|
|
}
|
|
else
|
|
{
|
|
if( C::debugWaiters )
|
|
{
|
|
error() << "Not waiting on other side, as nobody's there." << std::endl;
|
|
}
|
|
}
|
|
}
|
|
else{ abort(); }
|
|
}
|
|
|
|
static auto init= Utility::enroll <=[]
|
|
{
|
|
state()= std::make_shared< ThreadState >();
|
|
};
|
|
|
|
struct Thread::Impl
|
|
{
|
|
std::thread thr;
|
|
std::weak_ptr< ThreadState > state;
|
|
};
|
|
|
|
Thread::~Thread()= default;
|
|
|
|
Thread::Thread( std::function< void () > start )
|
|
{
|
|
auto newState= std::make_shared< ThreadState >();
|
|
std::weak_ptr local= newState;
|
|
|
|
auto entry= [newState= std::move( newState ), start= std::move( start )]
|
|
{
|
|
state()= std::move( newState );
|
|
start();
|
|
};
|
|
|
|
pimpl= std::make_unique< Impl >( std::thread{ std::move( entry ) }, std::move( local ) );
|
|
}
|
|
|
|
void
|
|
Thread::join()
|
|
{
|
|
// TODO: Make interruptible.
|
|
impl().thr.join();
|
|
}
|
|
|
|
void
|
|
Thread::notify( std::exception_ptr p )
|
|
{
|
|
const auto state= impl().state.lock();
|
|
if( not state ) return;
|
|
|
|
state->deliver( std::move( p ) );
|
|
}
|
|
}
|