1
0
forked from Alepha/Alepha
Files
Alepha/Thread.cc

247 lines
4.6 KiB
C++

static_assert( __cplusplus > 2020'99 );
#include "Thread.h"
#include <stop_token>
#include <condition_variable>
#include <thread>
#include <Alepha/Utility/StaticValue.h>
#include <Alepha/Utility/evaluation_helpers.h>
#include <Alepha/Utility/ScopedState.h>
#include "error.h"
namespace Alepha::Hydrogen ::detail:: Thread_m
{
namespace
{
namespace C
{
const bool debug= false;
const bool debugWaiters= false or C::debug;
}
struct ThreadState
{
std::mutex access;
std::stop_source source;
std::exception_ptr notification;
void
deliver( std::exception_ptr ptr )
{
std::lock_guard lock{ access };
if( notification ) return; // TODO: Don't swallow blocked interrupts?
notification= std::move( ptr );
source.request_stop();
}
void
interruption_point( std::lock_guard< std::mutex > & )
{
if( notification == nullptr ) return;
source= std::stop_source{};
auto next= std::move( notification );
notification= nullptr;
std::rethrow_exception( next );
}
void
interruption_point()
{
std::lock_guard lock( access );
interruption_point( lock );
}
std::stop_token
getToken()
{
std::lock_guard lock( access );
interruption_point( lock );
return source.get_token();
}
};
thread_local Utility::StaticValue< std::shared_ptr< ThreadState > > state;
}
struct ConditionVariable::Impl
{
std::condition_variable_any cond;
};
ConditionVariable::~ConditionVariable()= default;
ConditionVariable::ConditionVariable()
: pimpl( std::make_unique< Impl >() ) {}
void
ConditionVariable::notify_one()
{
impl().cond.notify_one();
}
void
ConditionVariable::notify_all()
{
impl().cond.notify_all();
}
void
ConditionVariable::wait( unique_lock< mutex > &lock )
{
auto stop= Utility::evaluate <=[]
{
std::lock_guard lock( state()->access );
return state()->source.get_token();
};
impl().cond.wait( lock, stop, [val= std::uint64_t()] mutable { return val++; } );
state()->interruption_point();
}
namespace
{
enum State
{
Open,
Waiting,
Synchronizing,
Opening,
};
}
struct Interlock::Impl
{
State state= Open;
bool threadBlocked= false;
ConditionVariable var;
};
Interlock::Interlock() : pimpl( std::make_unique< Impl >() ) {}
Interlock::~Interlock()= default;
bool
Interlock::waiter( unique_lock< mutex > &lock )
{
// Defensively, if a thread called `checkpoint`, we
// might encounter that thread here, and need
// that thread to terminate the previous interlock
// phase.
while( impl().state == Opening )
{
impl().var.wait( lock );
assert( not impl().threadBlocked );
}
if( impl().state == Open )
{
impl().state= Waiting;
return false;
}
else if( impl().state == Waiting )
{
impl().state= Synchronizing;
return true;
}
else { abort(); }
}
void
Interlock::noblock( unique_lock< mutex > &lock )
{
if( impl().state == Waiting )
{
// Nolock form will just leave the bit set.
assert( not impl().threadBlocked );
}
else if( impl().state == Synchronizing )
{
impl().state= Opening;
impl().var.notify_one();
}
else { abort(); }
}
void
Interlock::interlock( unique_lock< mutex > &lock )
{
if( impl().state == Waiting )
{
while( impl().state == Waiting )
{
Utility::ScopedState state{ impl().threadBlocked };
impl().var.wait( lock );
}
assert( not impl().threadBlocked );
impl().state= Open;
impl().var.notify_one();
}
else if( impl().state == Synchronizing )
{
impl().state= Opening;
if( impl().threadBlocked )
{
impl().var.notify_one();
while( impl().state == Opening )
{
impl().var.wait( lock );
}
}
else
{
if( C::debugWaiters )
{
error() << "Not waiting on other side, as nobody's there." << std::endl;
}
}
}
else{ abort(); }
}
static auto init= Utility::enroll <=[]
{
state()= std::make_shared< ThreadState >();
};
struct Thread::Impl
{
std::thread thr;
std::weak_ptr< ThreadState > state;
};
Thread::~Thread()= default;
Thread::Thread( std::function< void () > start )
{
auto newState= std::make_shared< ThreadState >();
std::weak_ptr local= newState;
auto entry= [newState= std::move( newState ), start= std::move( start )]
{
state()= std::move( newState );
start();
};
pimpl= std::make_unique< Impl >( std::thread{ std::move( entry ) }, std::move( local ) );
}
void
Thread::join()
{
// TODO: Make interruptible.
impl().thr.join();
}
void
Thread::notify( std::exception_ptr p )
{
const auto state= impl().state.lock();
if( not state ) return;
state->deliver( std::move( p ) );
}
}