forked from Alepha/Alepha
`Dropbox` and `Mailbox` should be roughly the same interface, but with different stall semantics. However, terminal operations don't wait. `finish( custom )` should be a handoff without wait. `Dropbox`'s primary thread-coordination works in terms of an acquire/release semantic whereby `release` doesn't block, so this naturally works. However, `Mailbox` works in terms of the `Interlock` primitive which allows for a terminal-case `checkpoint`. Therefore `finish( custom )` has to call the `checkpoint` api in `Mailbox`
278 lines
9.0 KiB
C++
278 lines
9.0 KiB
C++
static_assert( __cplusplus > 2020'99 );
|
|
|
|
/*!
|
|
* @file Mailbox.h
|
|
* @brief Class which abstracts a "mailbox" metaphor, for threaded programming
|
|
*
|
|
* Copyright (C) 2010 Alepha Library. All rights reserved. <BR>
|
|
* @author ADAM David Alan Martin
|
|
*/
|
|
|
|
#pragma once
|
|
|
|
#include <Alepha/Alepha.h>
|
|
|
|
#include <vector>
|
|
#include <exception>
|
|
|
|
#include <Alepha/Thread.h>
|
|
#include <Alepha/Exception.h>
|
|
#include <Alepha/assertion.h>
|
|
#include <Alepha/error.h>
|
|
|
|
namespace Alepha::Hydrogen::Atomic ::detail:: Mailbox_m
|
|
{
|
|
namespace C
|
|
{
|
|
const bool debug= false;
|
|
const bool debugInterlock= false or C::debug;
|
|
const bool debugPop= false or C::debug;
|
|
}
|
|
|
|
inline namespace exports
|
|
{
|
|
template< typename > class Mailbox;
|
|
|
|
using MailboxFinishedException= synthetic_exception< struct finished_exception, FinishedException >;
|
|
using AnyTaggedMailboxFinishedException= AnyTagged< MailboxFinishedException >;
|
|
template< typename tag > using TaggedMailboxFinishedException= Tagged< MailboxFinishedException, tag >;
|
|
|
|
using MailboxFinishedCondition= synthetic_exception< struct finished_exception, FinishedCondition, MailboxFinishedException >;
|
|
using AnyTaggedMailboxFinishedCondition= AnyTagged< MailboxFinishedCondition >;
|
|
template< typename tag > using TaggedMailboxFinishedCondition= Tagged< MailboxFinishedCondition, tag >;
|
|
}
|
|
|
|
template< typename T >
|
|
std::size_t
|
|
computeWeight( const T & )
|
|
{
|
|
return sizeof( T );
|
|
}
|
|
|
|
struct no_tag {};
|
|
|
|
/*!
|
|
* @brief The Mailbox class implements a mailbox metaphor. Just like postal service mailboxes,
|
|
* the mailbox class lets users add items to the box from a producer thread or group of threads,
|
|
* and in a (single, for now) consumer thread, take the entire contents of the mailbox out, and
|
|
* work with them.
|
|
*
|
|
* @tparam Item The Item type which is used in the mailbox containers.
|
|
*
|
|
* @note Mailboxes, for now, only support one consumer, and any number of producers.
|
|
*
|
|
* @invariant Mailboxes give out their contents in the order placed in.
|
|
*
|
|
* The Mailbox primitive is implemented as two "containers" which are swapped periodically by the
|
|
* consumer, and continually filled by the producers. This swapping keeps the identity in principle,
|
|
* but allows the user to access the contents of the "other side", in practice. The two mailbox
|
|
* containers are: "preparation" and "filled". The consumer takes a whole "filled" mailbox out, and
|
|
* replaces his current mailbox into the system for use as the new "preparation" box. The producer
|
|
* continually inserts mail into the "preparation" mailbox until full. The consumer clears his mailbox
|
|
* when returning it.
|
|
*/
|
|
template< typename Item >
|
|
class exports::Mailbox
|
|
{
|
|
public:
|
|
using FinishedException= synthetic_exception< struct finished_exception, MailboxFinishedException >;
|
|
//using AnyTaggedFinishedException= AnyTagged< FinishedException >;
|
|
//template< typename tag > using TaggedFinishedException= Tagged< FinishedException, tag >;
|
|
|
|
using FinishedCondition= synthetic_exception< struct finished_exception, MailboxFinishedCondition, FinishedException >;
|
|
//using AnyTaggedFinishedCondition= AnyTagged< FinishedCondition >;
|
|
//template< typename tag > using TaggedFinishedCondition= Tagged< FinishedCondition, tag >;
|
|
|
|
private:
|
|
/*! @brief The maximum weight of "mail" items which can be placed into a "preparation" mailbox */
|
|
const std::size_t boxLimit;
|
|
|
|
Alepha::mutex access;
|
|
Alepha::Interlock interlockPoint;
|
|
|
|
std::size_t weight= 0;
|
|
std::vector< Item > preparing;
|
|
|
|
std::vector< Item > filled;
|
|
std::vector< Item >::iterator pos= begin( filled );
|
|
|
|
std::exception_ptr notification;
|
|
|
|
bool finished= false;
|
|
|
|
bool mustFinish= false;
|
|
|
|
template< typename Lock >
|
|
void
|
|
awaken( Lock &lock )
|
|
{
|
|
filled.clear();
|
|
weight= 0;
|
|
|
|
using std::swap;
|
|
swap( filled, preparing );
|
|
pos= begin( filled );
|
|
}
|
|
|
|
void
|
|
interlock( Alepha::unique_lock< Alepha::mutex > &lock )
|
|
{
|
|
if( C::debugInterlock ) error() << "Interlock entered." << std::endl;
|
|
interlockPoint.wait( lock, [&]{ awaken( lock ); } );
|
|
}
|
|
|
|
void
|
|
checkpoint( Alepha::unique_lock< Alepha::mutex > &lock )
|
|
{
|
|
if( C::debugInterlock ) error() << "Checkpoint entered." << std::endl;
|
|
interlockPoint.checkpoint( lock, [&]{ awaken( lock ); } );
|
|
}
|
|
|
|
[[noreturn]] void
|
|
raiseNotification()
|
|
{
|
|
auto notification_ptr= std::move( notification );
|
|
notification= nullptr;
|
|
std::rethrow_exception( std::move( notification_ptr ) );
|
|
abort();
|
|
}
|
|
|
|
[[noreturn]] void
|
|
raiseFinished()
|
|
{
|
|
//if constexpr( std::is_same_v< tag, no_tag > )
|
|
throw build_exception< FinishedCondition >( "" );
|
|
//else throw TaggedFinishedCondition< tag >{};
|
|
abort();
|
|
}
|
|
|
|
public:
|
|
explicit inline
|
|
Mailbox( const size_t lim )
|
|
: boxLimit( lim )
|
|
{
|
|
assertion( pos == begin( filled ) );
|
|
}
|
|
|
|
/*!
|
|
* @todo Fix dox...
|
|
* @brief This function will give back the "now-current" preparation mailbox, and check out the current
|
|
* mail as the filled mailbox.
|
|
* @returns A reference to the current filled mailbox.
|
|
*
|
|
* @throw ClosureException When a closure event is encountered. (Any ClosureException derived type, see
|
|
* the ClosureException schedule for why these events get thrown.)
|
|
*
|
|
* @pre The mailbox must have some items in it. This is established for you by the internal locking system.
|
|
* @post The preparing mailbox is cleared, and the filled mailbox has the new mail.
|
|
* @invariant The filled mailbox will have some items in it, or a Alepha::ClosureException will be thrown.
|
|
* @invariant This function is not threadsafe in the presence of multiple consumers (fetchers).
|
|
*/
|
|
Item
|
|
pop()
|
|
{
|
|
if( C::debugPop ) error() << "Pop called (" << (void *) this << ")." << std::endl;
|
|
if( pos == end( filled ) ) [[unlikely]]
|
|
{
|
|
if( C::debugPop ) error() << "We might have to hit the sync point." << std::endl;
|
|
if( notification ) raiseNotification();
|
|
if( mustFinish ) raiseFinished();
|
|
Alepha::unique_lock lock( access );
|
|
if( C::debugPop ) error() << "We have to hit the sync point." << std::endl;
|
|
interlock( lock );
|
|
if( notification and pos == end( filled ) ) raiseNotification();
|
|
if( finished )
|
|
{
|
|
if( pos == end( filled ) ) raiseFinished();
|
|
mustFinish= true;
|
|
}
|
|
assertion( &*pos != nullptr );
|
|
assertion( pos != end( filled ) );
|
|
}
|
|
else if( C::debugPop ) error() << "We did not hit the sync point." << std::endl;
|
|
assertion( pos != end( filled ) );
|
|
assertion( not filled.empty() );
|
|
|
|
assertion( &*pos != nullptr );
|
|
return std::move( *pos++ );
|
|
}
|
|
|
|
bool pushWouldBlock() const noexcept { return weight > boxLimit; }
|
|
|
|
/*!
|
|
* @brief This function will add mail to the mailbox.
|
|
*
|
|
* @throw ClosureException When a closure event is encountered. (Any ClosureException derived type, see
|
|
* the ClosureException schedule for why these events get thrown.)
|
|
*
|
|
* @pre The mailbox must have room. This is established for you by the internal locking system.
|
|
* @post The preparing mailbox will have "item" as the last item added to this box.
|
|
* @invariant The mailbox will not be allowed to become overfull.
|
|
*/
|
|
void
|
|
push( Item item )
|
|
{
|
|
if( pushWouldBlock() )
|
|
{
|
|
[[unlikely]];
|
|
Alepha::unique_lock lock( access );
|
|
interlock( lock );
|
|
assertion( weight == 0 );
|
|
}
|
|
assertion( weight <= boxLimit );
|
|
|
|
weight+= computeWeight( item );
|
|
preparing.push_back( std::move( item ) );
|
|
}
|
|
|
|
/*!
|
|
* @brief This function will close the mailbox to any new mail, and deliver a closure
|
|
* event to the other side.
|
|
*
|
|
* @pre The mailbox must not be finished.
|
|
* @pre The mailbox will be put into the finished state.
|
|
*/
|
|
void
|
|
finish()
|
|
{
|
|
Alepha::unique_lock lock( access );
|
|
assertion( not finished );
|
|
finished= true;
|
|
checkpoint( lock );
|
|
}
|
|
|
|
/*!
|
|
* @brief This function will pump the mailbox, and deliver a custom notification
|
|
* event to the other side, without waiting (last step).
|
|
*/
|
|
void
|
|
finish( auto newNotification )
|
|
{
|
|
auto notification_ptr= std::make_exception_ptr( std::move( newNotification ) );
|
|
Alepha::unique_lock lock( access );
|
|
notification= std::move( notification_ptr );
|
|
checkpoint( lock );
|
|
}
|
|
|
|
|
|
/*!
|
|
* @brief This function will pump the mailbox, and deliver a custom notification
|
|
* event to the other side.
|
|
*/
|
|
void
|
|
notify( auto newNotification )
|
|
{
|
|
auto notification_ptr= std::make_exception_ptr( std::move( newNotification ) );
|
|
Alepha::unique_lock lock( access );
|
|
assertion( not finished );
|
|
notification= std::move( notification_ptr );
|
|
interlock( lock );
|
|
}
|
|
};
|
|
}
|
|
|
|
namespace Alepha::Hydrogen::Atomic::inline exports::inline Mailbox_m
|
|
{
|
|
using namespace detail::Mailbox_m::exports;
|
|
}
|