static_assert( __cplusplus > 2020'99 ); #include "Thread.h" #include #include #include #include #include #include #include "error.h" namespace Alepha::Hydrogen ::detail:: Thread_m { namespace { namespace C { const bool debug= false; const bool debugWaiters= false or C::debug; } struct ThreadState { std::mutex access; std::stop_source source; std::exception_ptr notification; void deliver( std::exception_ptr ptr ) { std::lock_guard lock{ access }; if( notification ) return; // TODO: Don't swallow blocked interrupts? notification= std::move( ptr ); source.request_stop(); } void interruption_point( std::lock_guard< std::mutex > & ) { if( notification == nullptr ) return; source= std::stop_source{}; auto next= std::move( notification ); notification= nullptr; std::rethrow_exception( next ); } void interruption_point() { std::lock_guard lock( access ); interruption_point( lock ); } std::stop_token getToken() { std::lock_guard lock( access ); interruption_point( lock ); return source.get_token(); } }; thread_local Utility::StaticValue< std::shared_ptr< ThreadState > > state; } struct ConditionVariable::Impl { std::condition_variable_any cond; }; ConditionVariable::~ConditionVariable()= default; ConditionVariable::ConditionVariable() : pimpl( std::make_unique< Impl >() ) {} void ConditionVariable::notify_one() { impl().cond.notify_one(); } void ConditionVariable::notify_all() { impl().cond.notify_all(); } void ConditionVariable::wait( unique_lock< mutex > &lock ) { auto stop= Utility::evaluate <=[] { std::lock_guard lock( state()->access ); return state()->source.get_token(); }; impl().cond.wait( lock, stop, [val= std::uint64_t()] mutable { return val++; } ); state()->interruption_point(); } namespace { enum State { Open, Waiting, Synchronizing, Opening, }; } struct Interlock::Impl { State state= Open; bool threadBlocked= false; ConditionVariable var; }; Interlock::Interlock() : pimpl( std::make_unique< Impl >() ) {} Interlock::~Interlock()= default; bool Interlock::waiter( unique_lock< mutex > &lock ) { // Defensively, if a thread called `checkpoint`, we // might encounter that thread here, and need // that thread to terminate the previous interlock // phase. while( impl().state == Opening ) { impl().var.wait( lock ); assert( not impl().threadBlocked ); } if( impl().state == Open ) { impl().state= Waiting; return false; } else if( impl().state == Waiting ) { impl().state= Synchronizing; return true; } else { abort(); } } void Interlock::noblock( unique_lock< mutex > &lock ) { if( impl().state == Waiting ) { // Nolock form will just leave the bit set. assert( not impl().threadBlocked ); } else if( impl().state == Synchronizing ) { impl().state= Opening; impl().var.notify_one(); } else { abort(); } } void Interlock::interlock( unique_lock< mutex > &lock ) { if( impl().state == Waiting ) { while( impl().state == Waiting ) { Utility::ScopedState state{ impl().threadBlocked }; impl().var.wait( lock ); } assert( not impl().threadBlocked ); impl().state= Open; impl().var.notify_one(); } else if( impl().state == Synchronizing ) { impl().state= Opening; if( impl().threadBlocked ) { impl().var.notify_one(); while( impl().state == Opening ) { impl().var.wait( lock ); } } else { if( C::debugWaiters ) { error() << "Not waiting on other side, as nobody's there." << std::endl; } } } else{ abort(); } } static auto init= Utility::enroll <=[] { state()= std::make_shared< ThreadState >(); }; struct Thread::Impl { std::thread thr; std::weak_ptr< ThreadState > state; }; Thread::~Thread()= default; Thread::Thread( std::function< void () > start ) { auto newState= std::make_shared< ThreadState >(); std::weak_ptr local= newState; auto entry= [newState= std::move( newState ), start= std::move( start )] { state()= std::move( newState ); start(); }; pimpl= std::make_unique< Impl >( std::thread{ std::move( entry ) }, std::move( local ) ); } void Thread::join() { // TODO: Make interruptible. impl().thr.join(); } void Thread::notify( std::exception_ptr p ) { const auto state= impl().state.lock(); if( not state ) return; state->deliver( std::move( p ) ); } }