static_assert( __cplusplus > 2020'99 ); /*! * @file Mailbox.h * @brief Class which abstracts a "mailbox" metaphor, for threaded programming * * Copyright (C) 2010 Alepha Library. All rights reserved.
* @author ADAM David Alan Martin */ #pragma once #include #include #include #include #include #include #include namespace Alepha::Hydrogen::Atomic ::detail:: Mailbox_m { namespace C { const bool debug= false; const bool debugInterlock= false or C::debug; const bool debugPop= false or C::debug; } inline namespace exports { template< typename > class Mailbox; using MailboxFinishedException= synthetic_exception< struct finished_exception, FinishedException >; using AnyTaggedMailboxFinishedException= AnyTagged< MailboxFinishedException >; template< typename tag > using TaggedMailboxFinishedException= Tagged< MailboxFinishedException, tag >; using MailboxFinishedCondition= synthetic_exception< struct finished_exception, FinishedCondition, MailboxFinishedException >; using AnyTaggedMailboxFinishedCondition= AnyTagged< MailboxFinishedCondition >; template< typename tag > using TaggedMailboxFinishedCondition= Tagged< MailboxFinishedCondition, tag >; } template< typename T > std::size_t computeWeight( const T & ) { return sizeof( T ); } struct no_tag {}; /*! * @brief The Mailbox class implements a mailbox metaphor. Just like postal service mailboxes, * the mailbox class lets users add items to the box from a producer thread or group of threads, * and in a (single, for now) consumer thread, take the entire contents of the mailbox out, and * work with them. * * @tparam Item The Item type which is used in the mailbox containers. * * @note Mailboxes, for now, only support one consumer, and any number of producers. * * @invariant Mailboxes give out their contents in the order placed in. * * The Mailbox primitive is implemented as two "containers" which are swapped periodically by the * consumer, and continually filled by the producers. This swapping keeps the identity in principle, * but allows the user to access the contents of the "other side", in practice. The two mailbox * containers are: "preparation" and "filled". The consumer takes a whole "filled" mailbox out, and * replaces his current mailbox into the system for use as the new "preparation" box. The producer * continually inserts mail into the "preparation" mailbox until full. The consumer clears his mailbox * when returning it. */ template< typename Item > class exports::Mailbox { public: using FinishedException= synthetic_exception< struct finished_exception, MailboxFinishedException >; //using AnyTaggedFinishedException= AnyTagged< FinishedException >; //template< typename tag > using TaggedFinishedException= Tagged< FinishedException, tag >; using FinishedCondition= synthetic_exception< struct finished_exception, MailboxFinishedCondition, FinishedException >; //using AnyTaggedFinishedCondition= AnyTagged< FinishedCondition >; //template< typename tag > using TaggedFinishedCondition= Tagged< FinishedCondition, tag >; private: /*! @brief The maximum weight of "mail" items which can be placed into a "preparation" mailbox */ const std::size_t boxLimit; Alepha::mutex access; Alepha::Interlock interlockPoint; std::size_t weight= 0; std::vector< Item > preparing; std::vector< Item > filled; std::vector< Item >::iterator pos= begin( filled ); std::exception_ptr notification; bool finished= false; bool mustFinish= false; template< typename Lock > void awaken( Lock &lock ) { filled.clear(); weight= 0; using std::swap; swap( filled, preparing ); pos= begin( filled ); } void interlock( Alepha::unique_lock< Alepha::mutex > &lock ) { if( C::debugInterlock ) error() << "Interlock entered." << std::endl; interlockPoint.wait( lock, [&]{ awaken( lock ); } ); } void checkpoint( Alepha::unique_lock< Alepha::mutex > &lock ) { if( C::debugInterlock ) error() << "Checkpoint entered." << std::endl; interlockPoint.checkpoint( lock, [&]{ awaken( lock ); } ); } [[noreturn]] void raiseNotification() { auto notification_ptr= std::move( notification ); notification= nullptr; std::rethrow_exception( std::move( notification_ptr ) ); abort(); } [[noreturn]] void raiseFinished() { //if constexpr( std::is_same_v< tag, no_tag > ) throw build_exception< FinishedCondition >( "" ); //else throw TaggedFinishedCondition< tag >{}; abort(); } public: explicit inline Mailbox( const size_t lim ) : boxLimit( lim ) { assertion( pos == begin( filled ) ); } /*! * @todo Fix dox... * @brief This function will give back the "now-current" preparation mailbox, and check out the current * mail as the filled mailbox. * @returns A reference to the current filled mailbox. * * @throw ClosureException When a closure event is encountered. (Any ClosureException derived type, see * the ClosureException schedule for why these events get thrown.) * * @pre The mailbox must have some items in it. This is established for you by the internal locking system. * @post The preparing mailbox is cleared, and the filled mailbox has the new mail. * @invariant The filled mailbox will have some items in it, or a Alepha::ClosureException will be thrown. * @invariant This function is not threadsafe in the presence of multiple consumers (fetchers). */ Item pop() { if( C::debugPop ) error() << "Pop called (" << (void *) this << ")." << std::endl; if( pos == end( filled ) ) [[unlikely]] { if( C::debugPop ) error() << "We might have to hit the sync point." << std::endl; if( notification ) raiseNotification(); if( mustFinish ) raiseFinished(); Alepha::unique_lock lock( access ); if( C::debugPop ) error() << "We have to hit the sync point." << std::endl; interlock( lock ); if( notification and pos == end( filled ) ) raiseNotification(); if( finished ) { if( pos == end( filled ) ) raiseFinished(); mustFinish= true; } assertion( &*pos != nullptr ); assertion( pos != end( filled ) ); } else if( C::debugPop ) error() << "We did not hit the sync point." << std::endl; assertion( pos != end( filled ) ); assertion( not filled.empty() ); assertion( &*pos != nullptr ); return std::move( *pos++ ); } bool pushWouldBlock() const noexcept { return weight > boxLimit; } /*! * @brief This function will add mail to the mailbox. * * @throw ClosureException When a closure event is encountered. (Any ClosureException derived type, see * the ClosureException schedule for why these events get thrown.) * * @pre The mailbox must have room. This is established for you by the internal locking system. * @post The preparing mailbox will have "item" as the last item added to this box. * @invariant The mailbox will not be allowed to become overfull. */ void push( Item item ) { if( pushWouldBlock() ) { [[unlikely]]; Alepha::unique_lock lock( access ); interlock( lock ); assertion( weight == 0 ); } assertion( weight <= boxLimit ); weight+= computeWeight( item ); preparing.push_back( std::move( item ) ); } /*! * @brief This function will close the mailbox to any new mail, and deliver a closure * event to the other side. * * @pre The mailbox must not be finished. * @pre The mailbox will be put into the finished state. */ void finish() { Alepha::unique_lock lock( access ); assertion( not finished ); finished= true; checkpoint( lock ); } /*! * @brief This function will pump the mailbox, and deliver a custom notification * event to the other side, without waiting (last step). */ void finish( auto newNotification ) { auto notification_ptr= std::make_exception_ptr( std::move( newNotification ) ); Alepha::unique_lock lock( access ); notification= std::move( notification_ptr ); checkpoint( lock ); } /*! * @brief This function will pump the mailbox, and deliver a custom notification * event to the other side. */ void notify( auto newNotification ) { auto notification_ptr= std::make_exception_ptr( std::move( newNotification ) ); Alepha::unique_lock lock( access ); assertion( not finished ); notification= std::move( notification_ptr ); interlock( lock ); } }; } namespace Alepha::Hydrogen::Atomic::inline exports::inline Mailbox_m { using namespace detail::Mailbox_m::exports; }