1
0
forked from Alepha/Alepha

Mailbox has been modernized.

This commit is contained in:
2023-11-16 07:32:56 -05:00
parent 4dac28a2c3
commit 4426fc2857
5 changed files with 249 additions and 163 deletions

1
Atomic/CMakeLists.txt Normal file
View File

@ -0,0 +1 @@
add_subdirectory( Mailbox.test )

View File

@ -1,3 +1,5 @@
static_assert( __cplusplus > 2020'99 );
/*!
* @file Mailbox.h
* @brief Class which abstracts a "mailbox" metaphor, for threaded programming
@ -6,20 +8,39 @@
* @author ADAM David Alan Martin
*/
#ifndef ALEPHA_MAILBOX_HEADER
#define ALEPHA_MAILBOX_HEADER
#include <Alepha/Alepha.h>
#include <vector>
#include <boost/thread.hpp>
#include <Alepha/Thread.h>
#include <Alepha/Exception.h>
#include <Alepha/assertion.h>
#include <Alepha/Event.h>
#include <Alepha/ScopedUsage.h>
#include <Alepha/Exceptions.h>
namespace Alepha::Hydrogen::Atomic ::detail:: Mailbox_m
{
inline namespace exports
{
template< typename > class Mailbox;
namespace Alepha
{
namespace Atomic
using MailboxFinishedException= synthetic_exception< struct finished_exception, FinishedException >;
using AnyTaggedMailboxFinishedException= AnyTagged< MailboxFinishedException >;
template< typename tag > using TaggedMailboxFinishedException= Tagged< MailboxFinishedException, tag >;
using MailboxFinishedCondition= synthetic_exception< struct finished_exception, FinishedCondition, MailboxFinishedException >;
using AnyTaggedMailboxFinishedCondition= AnyTagged< MailboxFinishedCondition >;
template< typename tag > using TaggedMailboxFinishedCondition= Tagged< MailboxFinishedCondition, tag >;
}
template< typename T >
std::size_t
computeWeight( const T & )
{
return sizeof( T );
}
struct no_tag {};
/*!
* @brief The Mailbox class implements a mailbox metaphor. Just like postal service mailboxes,
* the mailbox class lets users add items to the box from a producer thread or group of threads,
@ -41,49 +62,76 @@ namespace Alepha
* when returning it.
*/
template< typename Item >
class Mailbox
class exports::Mailbox
{
public:
using FinishedException= synthetic_exception< struct finished_exception, MailboxFinishedException >;
//using AnyTaggedFinishedException= AnyTagged< FinishedException >;
//template< typename tag > using TaggedFinishedException= Tagged< FinishedException, tag >;
using FinishedCondition= synthetic_exception< struct finished_exception, MailboxFinishedCondition, FinishedException >;
//using AnyTaggedFinishedCondition= AnyTagged< FinishedCondition >;
//template< typename tag > using TaggedFinishedCondition= Tagged< FinishedCondition, tag >;
private:
/*! @brief The maximum number of "mail" items which can be placed into a "preparation" mailbox */
const size_t boxLimit;
/*! @brief The maximum weight of "mail" items which can be placed into a "preparation" mailbox */
const std::size_t boxLimit;
const size_t minSwapFill;
boost::mutex mailboxAccess;
boost::condition boxFull;
boost::condition boxEmpty;
Alepha::mutex access;
Alepha::condition_variable ready;
bool suspended= false;
std::size_t weight= 0;
std::vector< Item > preparing;
std::vector< Item > filled;
std::vector< Item >::iterator pos= begin( filled );
bool finished;
bool terminated;
bool finished= false;
bool
testTerminated() const
bool mustFinish= false;
template< typename Lock >
void
awaken( Lock &lock )
{
if( terminated ) throw Alepha::TerminatedException();
return true;
swap( filled, preparing );
pos= begin( filled );
preparing.clear();
suspended= false;
weight= 0;
ready.notify_one();
}
bool
testFinished() const
template< typename Lock >
void
interlock( Lock &lock )
{
if( finished ) throw Alepha::FinishedException();
return true;
error() << "Interlock entered." << std::endl;
if( suspended ) awaken( lock );
else
{
suspended= true;
while( suspended ) ready.wait( lock );
}
}
[[noreturn]] void
raiseFinished()
{
//if constexpr( std::is_same_v< tag, no_tag > )
throw build_exception< FinishedCondition >( "" );
//else throw TaggedFinishedCondition< tag >{};
abort();
}
public:
explicit inline
Mailbox( const size_t lim, const size_t min= 1 )
: boxLimit( lim ), minSwapFill( min ), finished( false ), terminated( false )
{
this->preparing.reserve( boxLimit );
this->filled.reserve( boxLimit );
}
Mailbox( const size_t lim )
: boxLimit( lim ) {}
/*!
* @todo Fix dox...
* @brief This function will give back the "now-current" preparation mailbox, and check out the current
* mail as the filled mailbox.
* @returns A reference to the current filled mailbox.
@ -96,34 +144,30 @@ namespace Alepha
* @invariant The filled mailbox will have some items in it, or a Alepha::ClosureException will be thrown.
* @invariant This function is not threadsafe in the presence of multiple consumers (fetchers).
*/
std::vector< Item > &
fetchItems()
Item
pop()
{
boost::this_thread::interruption_point();
/* Our mailbox is done, so let's empty it. (Don't hold the lock during this, to reduce contention.) */
this->filled.clear();
if( pos == end( filled ) ) [[unlikely]]
{
error() << "We might have to hit the sync point." << std::endl;
if( mustFinish ) raiseFinished();
Alepha::unique_lock lock( access );
error() << "We have to hit the sync point." << std::endl;
interlock( lock );
if( finished )
{
if( pos == end( filled ) ) raiseFinished();
mustFinish= true;
}
assert( &*pos != nullptr );
assert( pos != end( filled ) );
}
else error() << "We did not hit the sync point." << std::endl;
assert( pos != end( filled ) );
assert( not filled.empty() );
ALEPHA_TEMPLATE_NAMED_USAGE( lock, this->mailboxAccess )
{
/* Make sure that there's mail we can get. */
while( this->testTerminated() && ( this->preparing.size() < minSwapFill ) )
//while( ( this->testTerminated() ) && ( this->preparing.empty() ) )
{
//this->testFinished();
if( this->finished && !this->preparing.empty() )
{
break;
}
this->testFinished();
this->boxFull.wait( lock );
}
assert( !this->preparing.empty() );
/* Exchange our mailbox with the preparation box. And give it back. */
std::swap( this->filled, this->preparing );
this->boxEmpty.notify_all();
}
return this->filled;
assert( &*pos != nullptr );
return std::move( *pos++ );
}
/*!
@ -137,22 +181,19 @@ namespace Alepha
* @invariant The mailbox will not be allowed to become overfull.
*/
void
push_back( const Item &item )
push( Item item )
{
boost::this_thread::interruption_point();
ALEPHA_TEMPLATE_NAMED_USAGE( lock, this->mailboxAccess )
if( weight > boxLimit )
{
/* Make sure we can put the mail in */
while( ( this->testTerminated() ) && ( this->preparing.size() == this->boxLimit ) )
{
this->boxEmpty.wait( lock );
[[unlikely]];
Alepha::unique_lock lock( access );
interlock( lock );
assertion( weight == 0 );
}
assert( !this->preparing.size() < this->boxLimit );
assertion( weight <= boxLimit );
/* Put the item into the mailbox. */
this->preparing.push_back( item );
this->boxFull.notify_one();
}
weight+= computeWeight( item );
preparing.push_back( std::move( item ) );
}
/*!
@ -163,31 +204,18 @@ namespace Alepha
* @pre The mailbox will be put into the finished state.
*/
void
close()
finish()
{
ALEPHA_TEMPLATE_SCOPED_USAGE( this->mailboxAccess )
{
this->finished= true;
this->boxFull.notify_all();
}
}
void
terminate()
{
ALEPHA_TEMPLATE_SCOPED_USAGE( this->mailboxAccess )
{
this->terminated= true;
this->boxEmpty.notify_all();
this->boxFull.notify_all();
}
Alepha::unique_lock lock( access );
assertion( not finished );
finished= true;
if( suspended ) interlock( lock );
else suspended= true;
}
};
}
namespace Alepha::Hydrogen::Atomic::inline exports::inline Mailbox_m
{
using namespace detail::Mailbox_m::exports;
}
#endif /*** ALEPHA_MAILBOX_HEADER ***/
/*
* vim:ts=3:sw=3:sts=3:sta:et:ft=cpp
*/

55
Atomic/Mailbox.test/0.cc Normal file
View File

@ -0,0 +1,55 @@
static_assert( __cplusplus > 2020'99 );
#include "../Mailbox.h"
#include <Alepha/Testing/test.h>
#include <Alepha/Testing/TableTest.h>
static auto init= Alepha::Utility::enroll <=[]
{
using namespace Alepha::Testing::exports;
using namespace Alepha::Testing::literals;
"Smoke Test"_test <=[]
{
Alepha::Atomic::Mailbox< int > m( 1024 );
m.push( 42 );
m.push( 42 );
m.finish();
assert( m.pop() == 42 );
assert( m.pop() == 42 );
try
{
m.pop();
std::cerr << "We passed the last pop!" << std::endl;
assert( false );
}
catch( const Alepha::Exception &cond )
{
std::cerr << "We caught an exception." << std::endl;
const Alepha::Exception *const cond_p= &cond;
assert( dynamic_cast< const Alepha::Atomic::MailboxFinishedException * >( cond_p ) );
assert( dynamic_cast< const Alepha::FinishedException * >( cond_p ) );
assert( dynamic_cast< const Alepha::Exception * >( cond_p ) );
assert( dynamic_cast< const Alepha::Atomic::MailboxFinishedCondition * >( cond_p ) );
assert( dynamic_cast< const Alepha::FinishedCondition * >( cond_p ) );
assert( dynamic_cast< const Alepha::Condition * >( cond_p ) );
assert( not dynamic_cast< const Alepha::Atomic::AnyTaggedMailboxFinishedException * >( cond_p ) );
assert( not dynamic_cast< const Alepha::AnyTaggedFinishedException * >( cond_p ) );
assert( not dynamic_cast< const Alepha::AnyTaggedException * >( cond_p ) );
assert( not dynamic_cast< const Alepha::Atomic::AnyTaggedMailboxFinishedCondition * >( cond_p ) );
assert( not dynamic_cast< const Alepha::AnyTaggedFinishedCondition * >( cond_p ) );
assert( not dynamic_cast< const Alepha::AnyTaggedCondition * >( cond_p ) );
}
catch( ... ) { abort(); }
};
};

View File

@ -0,0 +1 @@
unit_test( 0 )

View File

@ -20,6 +20,7 @@ link_libraries( alepha )
# The subdirs to build
add_subdirectory( Meta )
add_subdirectory( Atomic )
add_subdirectory( Proof )
add_subdirectory( IOStreams )
add_subdirectory( Reflection )