From b1d83a4bf013bd85284831ff711611613aaa0a4b Mon Sep 17 00:00:00 2001 From: ADAM David Alan Martin Date: Mon, 17 Oct 2022 22:51:39 -0400 Subject: [PATCH] Add a thread primitive which has support for notifications. Now you can send an `Alepha::Notification` into a thread as an interrupt. You have to use the Alepha threading primitives. --- Thread.h | 168 ++++++++++++++++++++++++++++++++++++++++++ Thread.test/Makefile | 10 +++ Thread.test/thread.cc | 70 ++++++++++++++++++ 3 files changed, 248 insertions(+) create mode 100644 Thread.h create mode 100644 Thread.test/Makefile create mode 100644 Thread.test/thread.cc diff --git a/Thread.h b/Thread.h new file mode 100644 index 0000000..4b4d196 --- /dev/null +++ b/Thread.h @@ -0,0 +1,168 @@ +static_assert( __cplusplus > 2020'00 ); + +#pragma once + +#include + +#include +#include +#include + +#include + +namespace Alepha::Hydrogen +{ + namespace detail::thread + { + inline namespace exports {} + namespace exports + { + using CrossThreadNotificationRethrowError= synthetic_exception< struct cross_thread_notification_failure, Error >; + } + + class NotificationInfo + { + private: + std::mutex access; + std::exception_ptr notification; + + public: + //template( Concepts::DerivedFrom< Notification > Exc ) + void + setNotification( std::exception_ptr &&exception ) + { + std::lock_guard lock( access ); + notification= std::move( exception ); + } + + template< typename Callable > + void + check_interrupt( Callable &&callable ) + try + { + callable(); + } + catch( const boost_ns::thread_interrupted & ) + { + std::lock_guard lock( access ); + if( not notification ) throw; + try + { + std::rethrow_exception( std::move( notification ) ); + } + catch( const std::bad_alloc & ) + { + throw build_exception< CrossThreadNotificationRethrowError >( "`std::bad_alloc` encountered in trying to " + "raise a cross-thread notification" ); + } + } + }; + + inline thread_local NotificationInfo notification; + + namespace exports + { + class ConditionVariable + : private boost_ns::condition_variable + { + public: + using condition_variable::notify_all; + using condition_variable::notify_one; + + template< typename Lock > + void + wait( Lock &&lock ) + { + notification.check_interrupt( [&]{ condition_variable::wait( std::forward< Lock >( lock ) ); } ); + } + + template< typename Lock, typename Predicate > + void + wait( Lock &&lock, Predicate &&predicate ) + { + notification.check_interrupt( [&]{ condition_variable::wait( std::forward< Lock >( lock ), + std::forward< Predicate >( predicate ) ); } ); + } + }; + + namespace this_thread + { + template< typename Clock, typename Duration > + void + sleep_until( const boost_ns::chrono::time_point< Clock, Duration > &abs_time ) + { + notification.check_interrupt( [&]{ boost_ns::this_thread::sleep_until( abs_time ); } ); + } + +#if 0 + template< typename Rep, typename Period > + void + sleep_for( const boost_ns::chrono::duration< Rep, Period > &rel_time ) + { + notification.check_interrupt( [&]( boost_ns::this_thread::sleep_until( rel_time ); } ); + } +#endif + } + } + + struct ThreadNotification + { + NotificationInfo *myNotification= nullptr; + }; + + namespace exports + { + class Thread + : ThreadNotification, boost_ns::thread + { + public: + template< typename Callable > + explicit + Thread( Callable &&callable ) + : thread + ( + [this, callable= std::forward< Callable >( callable )] + { + myNotification= ¬ification; + try { callable(); } + catch( const Notification & ) + { + // Notifications are not fatal. + } + } + ) + {} + + using thread::join; + using thread::detach; + + using thread::interrupt; + + //template( Concepts::DerivedFrom< Notification > Exc ) + template< typename Exc > + void + interrupt( Exc &&exception ) + try + { + throw std::forward< Exc >( exception ); + } + catch( const Notification & ) + { + myNotification->setNotification( std::current_exception() ); + interrupt(); + } + }; + + using Mutex= boost_ns::mutex; + using boost_ns::mutex; + using boost_ns::unique_lock; + using boost_ns::lock_guard; + } + } + + inline namespace exports {} + namespace exports::inline thread + { + using namespace detail::thread::exports; + } +} diff --git a/Thread.test/Makefile b/Thread.test/Makefile new file mode 100644 index 0000000..f2f34d1 --- /dev/null +++ b/Thread.test/Makefile @@ -0,0 +1,10 @@ +CXXFLAGS+= -std=c++2a -I ../ +CXXFLAGS+= -g -O0 +CXX=clang++-12 + +CXXFLAGS+= -Wno-inline-namespace-reopened-noninline +CXXFLAGS+= -Wno-unused-comparison + +LDLIBS+= -lboost_thread -lpthread + +all: thread diff --git a/Thread.test/thread.cc b/Thread.test/thread.cc new file mode 100644 index 0000000..4449096 --- /dev/null +++ b/Thread.test/thread.cc @@ -0,0 +1,70 @@ +static_assert( __cplusplus > 2020'00 ); + +#include + +#include + +#include +#include +#include + +namespace +{ + using Alepha::exports::types::argcnt_t; + using Alepha::exports::types::argvec_t; +} + +int +main( const argcnt_t argcnt, const argvec_t argvec ) +{ + return Alepha::Testing::runAllTests( argcnt, argvec ); +} + +namespace +{ + namespace util= Alepha::Utility; + using namespace Alepha::Testing::exports; + + using MyNotification= Alepha::create_exception< struct my_notification, Alepha::Notification >; + + auto tests= Alepha::Utility::enroll <=[] + { + "smoke"_test <=[] () -> bool + { + std::cerr << "Smoke started..." << std::endl; + Alepha::Mutex access; + Alepha::ConditionVariable cv; + auto threadMain= [&] + { + try + { + Alepha::unique_lock lock( access ); + std::cerr << "Child thread started..." << std::endl; + cv.wait( lock ); + std::cerr << "Child thread awoken illegally!" << std::endl; + } + catch( const boost::thread_interrupted & ) + { + std::cerr << "SHIT! We didn't get intercepted!" << std::endl; + throw; + } + catch( const MyNotification &n ) + { + std::cerr << "I caught it: " << n.message() << "!" << std::endl; + throw; + } + }; + + access.lock(); + std::cerr << "Launching child thread..." << std::endl; + Alepha::Thread thr( threadMain ); + std::cerr << "Child thread now launched..." << std::endl; + ::sleep( 1 ); + access.unlock(); + thr.interrupt( Alepha::build_exception< MyNotification >( "My message" ) ); + thr.join(); + + return true; + }; + }; +}