diff --git a/Atomic/CMakeLists.txt b/Atomic/CMakeLists.txt new file mode 100644 index 0000000..beefa7d --- /dev/null +++ b/Atomic/CMakeLists.txt @@ -0,0 +1 @@ +add_subdirectory( Mailbox.test ) diff --git a/Atomic/Mailbox.h b/Atomic/Mailbox.h index 93733c5..fc8a276 100644 --- a/Atomic/Mailbox.h +++ b/Atomic/Mailbox.h @@ -1,3 +1,5 @@ +static_assert( __cplusplus > 2020'99 ); + /*! * @file Mailbox.h * @brief Class which abstracts a "mailbox" metaphor, for threaded programming @@ -6,188 +8,214 @@ * @author ADAM David Alan Martin */ -#ifndef ALEPHA_MAILBOX_HEADER -#define ALEPHA_MAILBOX_HEADER +#include #include -#include +#include +#include +#include +#include -#include -#include - -namespace Alepha +namespace Alepha::Hydrogen::Atomic ::detail:: Mailbox_m { - namespace Atomic - { - /*! - * @brief The Mailbox class implements a mailbox metaphor. Just like postal service mailboxes, - * the mailbox class lets users add items to the box from a producer thread or group of threads, - * and in a (single, for now) consumer thread, take the entire contents of the mailbox out, and - * work with them. - * - * @tparam Item The Item type which is used in the mailbox containers. - * - * @note Mailboxes, for now, only support one consumer, and any number of producers. - * - * @invariant Mailboxes give out their contents in the order placed in. - * - * The Mailbox primitive is implemented as two "containers" which are swapped periodically by the - * consumer, and continually filled by the producers. This swapping keeps the identity in principle, - * but allows the user to access the contents of the "other side", in practice. The two mailbox - * containers are: "preparation" and "filled". The consumer takes a whole "filled" mailbox out, and - * replaces his current mailbox into the system for use as the new "preparation" box. The producer - * continually inserts mail into the "preparation" mailbox until full. The consumer clears his mailbox - * when returning it. - */ - template< typename Item > - class Mailbox - { - private: - /*! @brief The maximum number of "mail" items which can be placed into a "preparation" mailbox */ - const size_t boxLimit; + inline namespace exports + { + template< typename > class Mailbox; - const size_t minSwapFill; + using MailboxFinishedException= synthetic_exception< struct finished_exception, FinishedException >; + using AnyTaggedMailboxFinishedException= AnyTagged< MailboxFinishedException >; + template< typename tag > using TaggedMailboxFinishedException= Tagged< MailboxFinishedException, tag >; - boost::mutex mailboxAccess; + using MailboxFinishedCondition= synthetic_exception< struct finished_exception, FinishedCondition, MailboxFinishedException >; + using AnyTaggedMailboxFinishedCondition= AnyTagged< MailboxFinishedCondition >; + template< typename tag > using TaggedMailboxFinishedCondition= Tagged< MailboxFinishedCondition, tag >; + } - boost::condition boxFull; - boost::condition boxEmpty; + template< typename T > + std::size_t + computeWeight( const T & ) + { + return sizeof( T ); + } - std::vector< Item > preparing; - std::vector< Item > filled; + struct no_tag {}; - bool finished; - bool terminated; + /*! + * @brief The Mailbox class implements a mailbox metaphor. Just like postal service mailboxes, + * the mailbox class lets users add items to the box from a producer thread or group of threads, + * and in a (single, for now) consumer thread, take the entire contents of the mailbox out, and + * work with them. + * + * @tparam Item The Item type which is used in the mailbox containers. + * + * @note Mailboxes, for now, only support one consumer, and any number of producers. + * + * @invariant Mailboxes give out their contents in the order placed in. + * + * The Mailbox primitive is implemented as two "containers" which are swapped periodically by the + * consumer, and continually filled by the producers. This swapping keeps the identity in principle, + * but allows the user to access the contents of the "other side", in practice. The two mailbox + * containers are: "preparation" and "filled". The consumer takes a whole "filled" mailbox out, and + * replaces his current mailbox into the system for use as the new "preparation" box. The producer + * continually inserts mail into the "preparation" mailbox until full. The consumer clears his mailbox + * when returning it. + */ + template< typename Item > + class exports::Mailbox + { + public: + using FinishedException= synthetic_exception< struct finished_exception, MailboxFinishedException >; + //using AnyTaggedFinishedException= AnyTagged< FinishedException >; + //template< typename tag > using TaggedFinishedException= Tagged< FinishedException, tag >; - bool - testTerminated() const - { - if( terminated ) throw Alepha::TerminatedException(); - return true; - } + using FinishedCondition= synthetic_exception< struct finished_exception, MailboxFinishedCondition, FinishedException >; + //using AnyTaggedFinishedCondition= AnyTagged< FinishedCondition >; + //template< typename tag > using TaggedFinishedCondition= Tagged< FinishedCondition, tag >; - bool - testFinished() const - { - if( finished ) throw Alepha::FinishedException(); - return true; - } + private: + /*! @brief The maximum weight of "mail" items which can be placed into a "preparation" mailbox */ + const std::size_t boxLimit; - public: - explicit inline - Mailbox( const size_t lim, const size_t min= 1 ) - : boxLimit( lim ), minSwapFill( min ), finished( false ), terminated( false ) - { - this->preparing.reserve( boxLimit ); - this->filled.reserve( boxLimit ); - } + Alepha::mutex access; + Alepha::condition_variable ready; + bool suspended= false; - /*! - * @brief This function will give back the "now-current" preparation mailbox, and check out the current - * mail as the filled mailbox. - * @returns A reference to the current filled mailbox. - * - * @throw ClosureException When a closure event is encountered. (Any ClosureException derived type, see - * the ClosureException schedule for why these events get thrown.) - * - * @pre The mailbox must have some items in it. This is established for you by the internal locking system. - * @post The preparing mailbox is cleared, and the filled mailbox has the new mail. - * @invariant The filled mailbox will have some items in it, or a Alepha::ClosureException will be thrown. - * @invariant This function is not threadsafe in the presence of multiple consumers (fetchers). - */ - std::vector< Item > & - fetchItems() - { - boost::this_thread::interruption_point(); - /* Our mailbox is done, so let's empty it. (Don't hold the lock during this, to reduce contention.) */ - this->filled.clear(); + std::size_t weight= 0; + std::vector< Item > preparing; - ALEPHA_TEMPLATE_NAMED_USAGE( lock, this->mailboxAccess ) - { - /* Make sure that there's mail we can get. */ - while( this->testTerminated() && ( this->preparing.size() < minSwapFill ) ) - //while( ( this->testTerminated() ) && ( this->preparing.empty() ) ) - { - //this->testFinished(); - if( this->finished && !this->preparing.empty() ) - { - break; - } - this->testFinished(); - this->boxFull.wait( lock ); - } - assert( !this->preparing.empty() ); + std::vector< Item > filled; + std::vector< Item >::iterator pos= begin( filled ); - /* Exchange our mailbox with the preparation box. And give it back. */ - std::swap( this->filled, this->preparing ); - this->boxEmpty.notify_all(); - } - return this->filled; - } + bool finished= false; - /*! - * @brief This function will add mail to the mailbox. - * - * @throw ClosureException When a closure event is encountered. (Any ClosureException derived type, see - * the ClosureException schedule for why these events get thrown.) - * - * @pre The mailbox must have room. This is established for you by the internal locking system. - * @post The preparing mailbox will have "item" as the last item added to this box. - * @invariant The mailbox will not be allowed to become overfull. - */ - void - push_back( const Item &item ) - { - boost::this_thread::interruption_point(); - ALEPHA_TEMPLATE_NAMED_USAGE( lock, this->mailboxAccess ) - { - /* Make sure we can put the mail in */ - while( ( this->testTerminated() ) && ( this->preparing.size() == this->boxLimit ) ) - { - this->boxEmpty.wait( lock ); - } - assert( !this->preparing.size() < this->boxLimit ); + bool mustFinish= false; - /* Put the item into the mailbox. */ - this->preparing.push_back( item ); - this->boxFull.notify_one(); - } - } + template< typename Lock > + void + awaken( Lock &lock ) + { + swap( filled, preparing ); + pos= begin( filled ); + preparing.clear(); + suspended= false; + weight= 0; + ready.notify_one(); + } - /*! - * @brief This function will close the mailbox to any new mail, and deliver a closure - * event to the other side. - * - * @pre The mailbox must not be finished. - * @pre The mailbox will be put into the finished state. - */ - void - close() - { - ALEPHA_TEMPLATE_SCOPED_USAGE( this->mailboxAccess ) - { - this->finished= true; - this->boxFull.notify_all(); - } - } + template< typename Lock > + void + interlock( Lock &lock ) + { + error() << "Interlock entered." << std::endl; + if( suspended ) awaken( lock ); + else + { + suspended= true; + while( suspended ) ready.wait( lock ); + } + } - void - terminate() - { - ALEPHA_TEMPLATE_SCOPED_USAGE( this->mailboxAccess ) - { - this->terminated= true; - this->boxEmpty.notify_all(); - this->boxFull.notify_all(); - } - } - }; - } + [[noreturn]] void + raiseFinished() + { + //if constexpr( std::is_same_v< tag, no_tag > ) + throw build_exception< FinishedCondition >( "" ); + //else throw TaggedFinishedCondition< tag >{}; + abort(); + } + + public: + explicit inline + Mailbox( const size_t lim ) + : boxLimit( lim ) {} + + /*! + * @todo Fix dox... + * @brief This function will give back the "now-current" preparation mailbox, and check out the current + * mail as the filled mailbox. + * @returns A reference to the current filled mailbox. + * + * @throw ClosureException When a closure event is encountered. (Any ClosureException derived type, see + * the ClosureException schedule for why these events get thrown.) + * + * @pre The mailbox must have some items in it. This is established for you by the internal locking system. + * @post The preparing mailbox is cleared, and the filled mailbox has the new mail. + * @invariant The filled mailbox will have some items in it, or a Alepha::ClosureException will be thrown. + * @invariant This function is not threadsafe in the presence of multiple consumers (fetchers). + */ + Item + pop() + { + if( pos == end( filled ) ) [[unlikely]] + { + error() << "We might have to hit the sync point." << std::endl; + if( mustFinish ) raiseFinished(); + Alepha::unique_lock lock( access ); + error() << "We have to hit the sync point." << std::endl; + interlock( lock ); + if( finished ) + { + if( pos == end( filled ) ) raiseFinished(); + mustFinish= true; + } + assert( &*pos != nullptr ); + assert( pos != end( filled ) ); + } + else error() << "We did not hit the sync point." << std::endl; + assert( pos != end( filled ) ); + assert( not filled.empty() ); + + assert( &*pos != nullptr ); + return std::move( *pos++ ); + } + + /*! + * @brief This function will add mail to the mailbox. + * + * @throw ClosureException When a closure event is encountered. (Any ClosureException derived type, see + * the ClosureException schedule for why these events get thrown.) + * + * @pre The mailbox must have room. This is established for you by the internal locking system. + * @post The preparing mailbox will have "item" as the last item added to this box. + * @invariant The mailbox will not be allowed to become overfull. + */ + void + push( Item item ) + { + if( weight > boxLimit ) + { + [[unlikely]]; + Alepha::unique_lock lock( access ); + interlock( lock ); + assertion( weight == 0 ); + } + assertion( weight <= boxLimit ); + + weight+= computeWeight( item ); + preparing.push_back( std::move( item ) ); + } + + /*! + * @brief This function will close the mailbox to any new mail, and deliver a closure + * event to the other side. + * + * @pre The mailbox must not be finished. + * @pre The mailbox will be put into the finished state. + */ + void + finish() + { + Alepha::unique_lock lock( access ); + assertion( not finished ); + finished= true; + if( suspended ) interlock( lock ); + else suspended= true; + } + }; } -#endif /*** ALEPHA_MAILBOX_HEADER ***/ - -/* - * vim:ts=3:sw=3:sts=3:sta:et:ft=cpp - */ +namespace Alepha::Hydrogen::Atomic::inline exports::inline Mailbox_m +{ + using namespace detail::Mailbox_m::exports; +} diff --git a/Atomic/Mailbox.test/0.cc b/Atomic/Mailbox.test/0.cc new file mode 100644 index 0000000..ae03dbe --- /dev/null +++ b/Atomic/Mailbox.test/0.cc @@ -0,0 +1,55 @@ +static_assert( __cplusplus > 2020'99 ); + +#include "../Mailbox.h" + +#include +#include + +static auto init= Alepha::Utility::enroll <=[] +{ + using namespace Alepha::Testing::exports; + using namespace Alepha::Testing::literals; + + "Smoke Test"_test <=[] + { + Alepha::Atomic::Mailbox< int > m( 1024 ); + + m.push( 42 ); + m.push( 42 ); + m.finish(); + + assert( m.pop() == 42 ); + assert( m.pop() == 42 ); + + try + { + m.pop(); + std::cerr << "We passed the last pop!" << std::endl; + assert( false ); + } + catch( const Alepha::Exception &cond ) + { + std::cerr << "We caught an exception." << std::endl; + const Alepha::Exception *const cond_p= &cond; + + assert( dynamic_cast< const Alepha::Atomic::MailboxFinishedException * >( cond_p ) ); + assert( dynamic_cast< const Alepha::FinishedException * >( cond_p ) ); + assert( dynamic_cast< const Alepha::Exception * >( cond_p ) ); + + assert( dynamic_cast< const Alepha::Atomic::MailboxFinishedCondition * >( cond_p ) ); + assert( dynamic_cast< const Alepha::FinishedCondition * >( cond_p ) ); + assert( dynamic_cast< const Alepha::Condition * >( cond_p ) ); + + + + assert( not dynamic_cast< const Alepha::Atomic::AnyTaggedMailboxFinishedException * >( cond_p ) ); + assert( not dynamic_cast< const Alepha::AnyTaggedFinishedException * >( cond_p ) ); + assert( not dynamic_cast< const Alepha::AnyTaggedException * >( cond_p ) ); + + assert( not dynamic_cast< const Alepha::Atomic::AnyTaggedMailboxFinishedCondition * >( cond_p ) ); + assert( not dynamic_cast< const Alepha::AnyTaggedFinishedCondition * >( cond_p ) ); + assert( not dynamic_cast< const Alepha::AnyTaggedCondition * >( cond_p ) ); + } + catch( ... ) { abort(); } + }; +}; diff --git a/Atomic/Mailbox.test/CMakeLists.txt b/Atomic/Mailbox.test/CMakeLists.txt new file mode 100644 index 0000000..b099603 --- /dev/null +++ b/Atomic/Mailbox.test/CMakeLists.txt @@ -0,0 +1 @@ +unit_test( 0 ) diff --git a/CMakeLists.txt b/CMakeLists.txt index 895f050..e18158b 100644 --- a/CMakeLists.txt +++ b/CMakeLists.txt @@ -20,6 +20,7 @@ link_libraries( alepha ) # The subdirs to build add_subdirectory( Meta ) +add_subdirectory( Atomic ) add_subdirectory( Proof ) add_subdirectory( IOStreams ) add_subdirectory( Reflection )