diff --git a/CMakeLists.txt b/CMakeLists.txt index 8d68e48..895f050 100644 --- a/CMakeLists.txt +++ b/CMakeLists.txt @@ -13,6 +13,7 @@ add_library( alepha SHARED ProgramOptions.cc string_algorithms.cc word_wrap.cc + Thread.cc ) # Everything else depends upon it link_libraries( alepha ) diff --git a/Thread.cc b/Thread.cc new file mode 100644 index 0000000..d5f9dc0 --- /dev/null +++ b/Thread.cc @@ -0,0 +1,130 @@ +static_assert( __cplusplus > 2020'99 ); + +#include "Thread.h" + +#include +#include + +#include +#include + +namespace Alepha::Hydrogen::detail::Thread_m +{ + namespace + { + struct ThreadState + { + std::mutex access; + std::stop_source source; + std::exception_ptr notification; + + void + deliver( std::exception_ptr ptr ) + { + std::lock_guard lock{ access }; + if( notification ) return; // TODO: Don't swallow blocked interrupts? + notification= std::move( ptr ); + source.request_stop(); + } + + void + interruption_point( std::lock_guard< std::mutex > & ) + { + if( notification == nullptr ) return; + source= std::stop_source{}; + auto next= std::move( notification ); + notification= nullptr; + std::rethrow_exception( next ); + } + + void + interruption_point() + { + std::lock_guard lock( access ); + interruption_point( lock ); + } + + std::stop_token + getToken() + { + std::lock_guard lock( access ); + interruption_point( lock ); + return source.get_token(); + } + }; + + thread_local Utility::StaticValue< std::shared_ptr< ThreadState > > state; + } + + struct ConditionVariable::Impl + { + std::condition_variable_any cond; + }; + + ConditionVariable::~ConditionVariable()= default; + + ConditionVariable::ConditionVariable() + : pimpl( std::make_unique< Impl >() ) {} + + void + ConditionVariable::notify_one() + { + impl().cond.notify_one(); + } + + void + ConditionVariable::notify_all() + { + impl().cond.notify_all(); + } + + void + ConditionVariable::wait( unique_lock< mutex > &lock ) + { + auto stop= Utility::evaluate <=[] + { + std::lock_guard lock( state()->access ); + return state()->source.get_token(); + }; + impl().cond.wait( lock, stop, [val= std::uint64_t()] mutable { return val++; } ); + state()->interruption_point(); + } + + struct Thread::Impl + { + std::thread thr; + std::weak_ptr< ThreadState > state; + }; + + Thread::~Thread()= default; + + Thread::Thread( std::function< void () > start ) + { + auto newState= std::make_shared< ThreadState >(); + std::weak_ptr local= newState; + + auto entry= [newState= std::move( newState ), start= std::move( start )] + { + state()= std::move( newState ); + start(); + }; + + pimpl= std::make_unique< Impl >( std::thread{ std::move( entry ) }, std::move( local ) ); + } + + void + Thread::join() + { + // TODO: Make interruptible. + impl().thr.join(); + } + + void + Thread::notify( std::exception_ptr p ) + { + const auto state= impl().state.lock(); + if( not state ) return; + + state->deliver( std::move( p ) ); + } +} diff --git a/Thread.h b/Thread.h index 6e23adf..19d35a3 100644 --- a/Thread.h +++ b/Thread.h @@ -4,165 +4,94 @@ static_assert( __cplusplus > 2020'99 ); #include -#include -#include -#include +#include +#include +#include +#include #include -namespace Alepha::Hydrogen +namespace Alepha::Hydrogen ::detail:: Thread_m { - namespace detail::Thread_m + inline namespace exports { - inline namespace exports {} - namespace exports + using CrossThreadNotificationRethrowError= synthetic_exception< struct cross_thread_notification_failure, Error >; + namespace this_thread { - using CrossThreadNotificationRethrowError= synthetic_exception< struct cross_thread_notification_failure, Error >; + //void sleep( std::chrono::duration ); } - class NotificationInfo - { - private: - std::mutex access; - std::exception_ptr notification; + struct ThreadInterrupted; + class Thread; + using thread= Thread; + class ConditionVariable; + using condition_variable= ConditionVariable; - public: - //template( Concepts::DerivedFrom< Notification > Exc ) - void - setNotification( std::exception_ptr &&exception ) - { - std::lock_guard lock( access ); - notification= std::move( exception ); - } + using std::mutex; + using std::lock_guard; + using std::unique_lock; + } - template< typename Callable > - void - check_interrupt( Callable &&callable ) - try - { - callable(); - } - catch( const boost::thread_interrupted & ) - { - std::lock_guard lock( access ); - if( not notification ) throw; - try - { - std::rethrow_exception( std::move( notification ) ); - } - catch( const std::bad_alloc & ) - { - throw build_exception< CrossThreadNotificationRethrowError >( "`std::bad_alloc` encountered in trying to " - "raise a cross-thread notification" ); - } - } - }; - - inline thread_local NotificationInfo notification; + class exports::ConditionVariable + { + private: + struct Impl; + std::unique_ptr< Impl > pimpl; - namespace exports - { - class ConditionVariable - : private boost::condition_variable + auto &impl() { return *pimpl; } + const auto &impl() const { return *pimpl; } + + public: + ~ConditionVariable(); + ConditionVariable(); + + void wait( unique_lock< mutex > & ); + + void + wait( unique_lock< mutex > &lock, auto predicate ) { - public: - using condition_variable::notify_all; - using condition_variable::notify_one; - - template< typename Lock > - void - wait( Lock &&lock ) - { - notification.check_interrupt( [&]{ condition_variable::wait( std::forward< Lock >( lock ) ); } ); - } - - template< typename Lock, typename Predicate > - void - wait( Lock &&lock, Predicate &&predicate ) - { - notification.check_interrupt( [&]{ condition_variable::wait( std::forward< Lock >( lock ), - std::forward< Predicate >( predicate ) ); } ); - } - }; - - namespace this_thread - { - template< typename Clock, typename Duration > - void - sleep_until( const boost::chrono::time_point< Clock, Duration > &abs_time ) - { - notification.check_interrupt( [&]{ boost::this_thread::sleep_until( abs_time ); } ); - } - -#if 0 - template< typename Rep, typename Period > - void - sleep_for( const boost::chrono::duration< Rep, Period > &rel_time ) - { - notification.check_interrupt( [&]( boost::this_thread::sleep_until( rel_time ); } ); - } -#endif + while( not predicate() ) wait( lock ); } - } - struct ThreadNotification - { - NotificationInfo *myNotification= nullptr; - }; - - namespace exports - { - class Thread - : ThreadNotification, boost::thread - { - public: - template< typename Callable > - explicit - Thread( Callable &&callable ) - : thread - ( - [this, callable= std::forward< Callable >( callable )] - { - myNotification= ¬ification; - try { callable(); } - catch( const Notification & ) - { - // Notifications are not fatal. - } - } - ) - {} + void notify_one(); + void notify_all(); + }; - using thread::join; - using thread::detach; - - using thread::interrupt; + struct exports::ThreadInterrupted {}; - //template( Concepts::DerivedFrom< Notification > Exc ) - template< typename Exc > - void - interrupt( Exc &&exception ) - try - { - throw std::forward< Exc >( exception ); - } - catch( const Notification & ) - { - myNotification->setNotification( std::current_exception() ); - interrupt(); - } - }; - - using Mutex= boost::mutex; - using boost::mutex; - using boost::unique_lock; - using boost::lock_guard; - } - } - - inline namespace exports {} - namespace exports::inline Thread_m + class exports::Thread { - using namespace detail::Thread_m::exports; - } + private: + struct Impl; + std::unique_ptr< Impl > pimpl; + + auto &impl() { return *pimpl; } + const auto &impl() const { return *pimpl; } + + void notify( std::exception_ptr ); + + public: + ~Thread(); + explicit Thread( std::function< void () > start ); + + template< typename Notification > + void + notify( Notification notification ) + { + notify( std::make_exception_ptr( std::move( notification ) ) ); + } + + void + interrupt() + { + notify( std::make_exception_ptr( ThreadInterrupted{} ) ); + } + + void join(); + }; +} + +namespace Alepha::Hydrogen::inline exports::inline Thread_m +{ + using namespace detail::Thread_m::exports; } diff --git a/Thread.test/thread.cc b/Thread.test/thread.cc index b19858e..5f41a6a 100644 --- a/Thread.test/thread.cc +++ b/Thread.test/thread.cc @@ -23,8 +23,8 @@ namespace "smoke"_test <=[] () -> bool { std::cerr << "Smoke started..." << std::endl; - Alepha::Mutex access; - Alepha::ConditionVariable cv; + Alepha::mutex access; + Alepha::condition_variable cv; auto threadMain= [&] { try @@ -34,7 +34,7 @@ namespace cv.wait( lock ); std::cerr << "Child thread awoken illegally!" << std::endl; } - catch( const boost::thread_interrupted & ) + catch( const Alepha::ThreadInterrupted & ) { std::cerr << "OOPS! We didn't get intercepted!" << std::endl; throw; @@ -42,7 +42,6 @@ namespace catch( const MyNotification &n ) { std::cerr << "I caught it: " << n.message() << "!" << std::endl; - throw; } }; @@ -52,7 +51,7 @@ namespace std::cerr << "Child thread now launched..." << std::endl; ::sleep( 1 ); access.unlock(); - thr.interrupt( Alepha::build_exception< MyNotification >( "My message" ) ); + thr.notify( Alepha::build_exception< MyNotification >( "My message" ) ); thr.join(); return true;