1
0
forked from Alepha/Alepha

Modernize thread code.

This commit is contained in:
2023-11-14 15:04:04 -05:00
parent c6b66aa7b5
commit b8d6c5aced
4 changed files with 208 additions and 149 deletions

View File

@ -13,6 +13,7 @@ add_library( alepha SHARED
ProgramOptions.cc ProgramOptions.cc
string_algorithms.cc string_algorithms.cc
word_wrap.cc word_wrap.cc
Thread.cc
) )
# Everything else depends upon it # Everything else depends upon it
link_libraries( alepha ) link_libraries( alepha )

130
Thread.cc Normal file
View File

@ -0,0 +1,130 @@
static_assert( __cplusplus > 2020'99 );
#include "Thread.h"
#include <stop_token>
#include <condition_variable>
#include <Alepha/Utility/StaticValue.h>
#include <Alepha/Utility/evaluation_helpers.h>
namespace Alepha::Hydrogen::detail::Thread_m
{
namespace
{
struct ThreadState
{
std::mutex access;
std::stop_source source;
std::exception_ptr notification;
void
deliver( std::exception_ptr ptr )
{
std::lock_guard lock{ access };
if( notification ) return; // TODO: Don't swallow blocked interrupts?
notification= std::move( ptr );
source.request_stop();
}
void
interruption_point( std::lock_guard< std::mutex > & )
{
if( notification == nullptr ) return;
source= std::stop_source{};
auto next= std::move( notification );
notification= nullptr;
std::rethrow_exception( next );
}
void
interruption_point()
{
std::lock_guard lock( access );
interruption_point( lock );
}
std::stop_token
getToken()
{
std::lock_guard lock( access );
interruption_point( lock );
return source.get_token();
}
};
thread_local Utility::StaticValue< std::shared_ptr< ThreadState > > state;
}
struct ConditionVariable::Impl
{
std::condition_variable_any cond;
};
ConditionVariable::~ConditionVariable()= default;
ConditionVariable::ConditionVariable()
: pimpl( std::make_unique< Impl >() ) {}
void
ConditionVariable::notify_one()
{
impl().cond.notify_one();
}
void
ConditionVariable::notify_all()
{
impl().cond.notify_all();
}
void
ConditionVariable::wait( unique_lock< mutex > &lock )
{
auto stop= Utility::evaluate <=[]
{
std::lock_guard lock( state()->access );
return state()->source.get_token();
};
impl().cond.wait( lock, stop, [val= std::uint64_t()] mutable { return val++; } );
state()->interruption_point();
}
struct Thread::Impl
{
std::thread thr;
std::weak_ptr< ThreadState > state;
};
Thread::~Thread()= default;
Thread::Thread( std::function< void () > start )
{
auto newState= std::make_shared< ThreadState >();
std::weak_ptr local= newState;
auto entry= [newState= std::move( newState ), start= std::move( start )]
{
state()= std::move( newState );
start();
};
pimpl= std::make_unique< Impl >( std::thread{ std::move( entry ) }, std::move( local ) );
}
void
Thread::join()
{
// TODO: Make interruptible.
impl().thr.join();
}
void
Thread::notify( std::exception_ptr p )
{
const auto state= impl().state.lock();
if( not state ) return;
state->deliver( std::move( p ) );
}
}

199
Thread.h
View File

@ -4,165 +4,94 @@ static_assert( __cplusplus > 2020'99 );
#include <Alepha/Alepha.h> #include <Alepha/Alepha.h>
#include <boost/thread.hpp> #include <mutex>
#include <boost/thread/mutex.hpp> #include <stop_token>
#include <boost/thread/condition_variable.hpp> #include <functional>
#include <exception>
#include <Alepha/Exception.h> #include <Alepha/Exception.h>
namespace Alepha::Hydrogen namespace Alepha::Hydrogen ::detail:: Thread_m
{ {
namespace detail::Thread_m inline namespace exports
{
inline namespace exports {}
namespace exports
{ {
using CrossThreadNotificationRethrowError= synthetic_exception< struct cross_thread_notification_failure, Error >; using CrossThreadNotificationRethrowError= synthetic_exception< struct cross_thread_notification_failure, Error >;
}
class NotificationInfo
{
private:
std::mutex access;
std::exception_ptr notification;
public:
//template( Concepts::DerivedFrom< Notification > Exc )
void
setNotification( std::exception_ptr &&exception )
{
std::lock_guard lock( access );
notification= std::move( exception );
}
template< typename Callable >
void
check_interrupt( Callable &&callable )
try
{
callable();
}
catch( const boost::thread_interrupted & )
{
std::lock_guard lock( access );
if( not notification ) throw;
try
{
std::rethrow_exception( std::move( notification ) );
}
catch( const std::bad_alloc & )
{
throw build_exception< CrossThreadNotificationRethrowError >( "`std::bad_alloc` encountered in trying to "
"raise a cross-thread notification" );
}
}
};
inline thread_local NotificationInfo notification;
namespace exports
{
class ConditionVariable
: private boost::condition_variable
{
public:
using condition_variable::notify_all;
using condition_variable::notify_one;
template< typename Lock >
void
wait( Lock &&lock )
{
notification.check_interrupt( [&]{ condition_variable::wait( std::forward< Lock >( lock ) ); } );
}
template< typename Lock, typename Predicate >
void
wait( Lock &&lock, Predicate &&predicate )
{
notification.check_interrupt( [&]{ condition_variable::wait( std::forward< Lock >( lock ),
std::forward< Predicate >( predicate ) ); } );
}
};
namespace this_thread namespace this_thread
{ {
template< typename Clock, typename Duration > //void sleep( std::chrono::duration );
void
sleep_until( const boost::chrono::time_point< Clock, Duration > &abs_time )
{
notification.check_interrupt( [&]{ boost::this_thread::sleep_until( abs_time ); } );
} }
#if 0 struct ThreadInterrupted;
template< typename Rep, typename Period > class Thread;
void using thread= Thread;
sleep_for( const boost::chrono::duration< Rep, Period > &rel_time ) class ConditionVariable;
{ using condition_variable= ConditionVariable;
notification.check_interrupt( [&]( boost::this_thread::sleep_until( rel_time ); } );
} using std::mutex;
#endif using std::lock_guard;
} using std::unique_lock;
} }
struct ThreadNotification class exports::ConditionVariable
{ {
NotificationInfo *myNotification= nullptr; private:
}; struct Impl;
std::unique_ptr< Impl > pimpl;
auto &impl() { return *pimpl; }
const auto &impl() const { return *pimpl; }
namespace exports
{
class Thread
: ThreadNotification, boost::thread
{
public: public:
template< typename Callable > ~ConditionVariable();
explicit ConditionVariable();
Thread( Callable &&callable )
: thread
(
[this, callable= std::forward< Callable >( callable )]
{
myNotification= &notification;
try { callable(); }
catch( const Notification & )
{
// Notifications are not fatal.
}
}
)
{}
using thread::join; void wait( unique_lock< mutex > & );
using thread::detach;
using thread::interrupt;
//template( Concepts::DerivedFrom< Notification > Exc )
template< typename Exc >
void void
interrupt( Exc &&exception ) wait( unique_lock< mutex > &lock, auto predicate )
try
{ {
throw std::forward< Exc >( exception ); while( not predicate() ) wait( lock );
}
catch( const Notification & )
{
myNotification->setNotification( std::current_exception() );
interrupt();
} }
void notify_one();
void notify_all();
}; };
using Mutex= boost::mutex; struct exports::ThreadInterrupted {};
using boost::mutex;
using boost::unique_lock; class exports::Thread
using boost::lock_guard; {
} private:
struct Impl;
std::unique_ptr< Impl > pimpl;
auto &impl() { return *pimpl; }
const auto &impl() const { return *pimpl; }
void notify( std::exception_ptr );
public:
~Thread();
explicit Thread( std::function< void () > start );
template< typename Notification >
void
notify( Notification notification )
{
notify( std::make_exception_ptr( std::move( notification ) ) );
} }
inline namespace exports {} void
namespace exports::inline Thread_m interrupt()
{
notify( std::make_exception_ptr( ThreadInterrupted{} ) );
}
void join();
};
}
namespace Alepha::Hydrogen::inline exports::inline Thread_m
{ {
using namespace detail::Thread_m::exports; using namespace detail::Thread_m::exports;
} }
}

View File

@ -23,8 +23,8 @@ namespace
"smoke"_test <=[] () -> bool "smoke"_test <=[] () -> bool
{ {
std::cerr << "Smoke started..." << std::endl; std::cerr << "Smoke started..." << std::endl;
Alepha::Mutex access; Alepha::mutex access;
Alepha::ConditionVariable cv; Alepha::condition_variable cv;
auto threadMain= [&] auto threadMain= [&]
{ {
try try
@ -34,7 +34,7 @@ namespace
cv.wait( lock ); cv.wait( lock );
std::cerr << "Child thread awoken illegally!" << std::endl; std::cerr << "Child thread awoken illegally!" << std::endl;
} }
catch( const boost::thread_interrupted & ) catch( const Alepha::ThreadInterrupted & )
{ {
std::cerr << "OOPS! We didn't get intercepted!" << std::endl; std::cerr << "OOPS! We didn't get intercepted!" << std::endl;
throw; throw;
@ -42,7 +42,6 @@ namespace
catch( const MyNotification &n ) catch( const MyNotification &n )
{ {
std::cerr << "I caught it: " << n.message() << "!" << std::endl; std::cerr << "I caught it: " << n.message() << "!" << std::endl;
throw;
} }
}; };
@ -52,7 +51,7 @@ namespace
std::cerr << "Child thread now launched..." << std::endl; std::cerr << "Child thread now launched..." << std::endl;
::sleep( 1 ); ::sleep( 1 );
access.unlock(); access.unlock();
thr.interrupt( Alepha::build_exception< MyNotification >( "My message" ) ); thr.notify( Alepha::build_exception< MyNotification >( "My message" ) );
thr.join(); thr.join();
return true; return true;