1
0
forked from Alepha/Alepha

I found the original Mailbox implementation from over a decade ago.

This commit is contained in:
2023-11-08 00:22:06 -05:00
parent d1157cabf7
commit adbbdf940f

193
Atomic/Mailbox.h Normal file
View File

@ -0,0 +1,193 @@
/*!
* @file Mailbox.h
* @brief Class which abstracts a "mailbox" metaphor, for threaded programming
*
* Copyright (C) 2010 Alepha Library. All rights reserved. <BR>
* @author ADAM David Alan Martin
*/
#ifndef ALEPHA_MAILBOX_HEADER
#define ALEPHA_MAILBOX_HEADER
#include <vector>
#include <boost/thread.hpp>
#include <Alepha/ScopedUsage.h>
#include <Alepha/Exceptions.h>
namespace Alepha
{
namespace Atomic
{
/*!
* @brief The Mailbox class implements a mailbox metaphor. Just like postal service mailboxes,
* the mailbox class lets users add items to the box from a producer thread or group of threads,
* and in a (single, for now) consumer thread, take the entire contents of the mailbox out, and
* work with them.
*
* @tparam Item The Item type which is used in the mailbox containers.
*
* @note Mailboxes, for now, only support one consumer, and any number of producers.
*
* @invariant Mailboxes give out their contents in the order placed in.
*
* The Mailbox primitive is implemented as two "containers" which are swapped periodically by the
* consumer, and continually filled by the producers. This swapping keeps the identity in principle,
* but allows the user to access the contents of the "other side", in practice. The two mailbox
* containers are: "preparation" and "filled". The consumer takes a whole "filled" mailbox out, and
* replaces his current mailbox into the system for use as the new "preparation" box. The producer
* continually inserts mail into the "preparation" mailbox until full. The consumer clears his mailbox
* when returning it.
*/
template< typename Item >
class Mailbox
{
private:
/*! @brief The maximum number of "mail" items which can be placed into a "preparation" mailbox */
const size_t boxLimit;
const size_t minSwapFill;
boost::mutex mailboxAccess;
boost::condition boxFull;
boost::condition boxEmpty;
std::vector< Item > preparing;
std::vector< Item > filled;
bool finished;
bool terminated;
bool
testTerminated() const
{
if( terminated ) throw Alepha::TerminatedException();
return true;
}
bool
testFinished() const
{
if( finished ) throw Alepha::FinishedException();
return true;
}
public:
explicit inline
Mailbox( const size_t lim, const size_t min= 1 )
: boxLimit( lim ), minSwapFill( min ), finished( false ), terminated( false )
{
this->preparing.reserve( boxLimit );
this->filled.reserve( boxLimit );
}
/*!
* @brief This function will give back the "now-current" preparation mailbox, and check out the current
* mail as the filled mailbox.
* @returns A reference to the current filled mailbox.
*
* @throw ClosureException When a closure event is encountered. (Any ClosureException derived type, see
* the ClosureException schedule for why these events get thrown.)
*
* @pre The mailbox must have some items in it. This is established for you by the internal locking system.
* @post The preparing mailbox is cleared, and the filled mailbox has the new mail.
* @invariant The filled mailbox will have some items in it, or a Alepha::ClosureException will be thrown.
* @invariant This function is not threadsafe in the presence of multiple consumers (fetchers).
*/
std::vector< Item > &
fetchItems()
{
boost::this_thread::interruption_point();
/* Our mailbox is done, so let's empty it. (Don't hold the lock during this, to reduce contention.) */
this->filled.clear();
ALEPHA_TEMPLATE_NAMED_USAGE( lock, this->mailboxAccess )
{
/* Make sure that there's mail we can get. */
while( this->testTerminated() && ( this->preparing.size() < minSwapFill ) )
//while( ( this->testTerminated() ) && ( this->preparing.empty() ) )
{
//this->testFinished();
if( this->finished && !this->preparing.empty() )
{
break;
}
this->testFinished();
this->boxFull.wait( lock );
}
assert( !this->preparing.empty() );
/* Exchange our mailbox with the preparation box. And give it back. */
std::swap( this->filled, this->preparing );
this->boxEmpty.notify_all();
}
return this->filled;
}
/*!
* @brief This function will add mail to the mailbox.
*
* @throw ClosureException When a closure event is encountered. (Any ClosureException derived type, see
* the ClosureException schedule for why these events get thrown.)
*
* @pre The mailbox must have room. This is established for you by the internal locking system.
* @post The preparing mailbox will have "item" as the last item added to this box.
* @invariant The mailbox will not be allowed to become overfull.
*/
void
push_back( const Item &item )
{
boost::this_thread::interruption_point();
ALEPHA_TEMPLATE_NAMED_USAGE( lock, this->mailboxAccess )
{
/* Make sure we can put the mail in */
while( ( this->testTerminated() ) && ( this->preparing.size() == this->boxLimit ) )
{
this->boxEmpty.wait( lock );
}
assert( !this->preparing.size() < this->boxLimit );
/* Put the item into the mailbox. */
this->preparing.push_back( item );
this->boxFull.notify_one();
}
}
/*!
* @brief This function will close the mailbox to any new mail, and deliver a closure
* event to the other side.
*
* @pre The mailbox must not be finished.
* @pre The mailbox will be put into the finished state.
*/
void
close()
{
ALEPHA_TEMPLATE_SCOPED_USAGE( this->mailboxAccess )
{
this->finished= true;
this->boxFull.notify_all();
}
}
void
terminate()
{
ALEPHA_TEMPLATE_SCOPED_USAGE( this->mailboxAccess )
{
this->terminated= true;
this->boxEmpty.notify_all();
this->boxFull.notify_all();
}
}
};
}
}
#endif /*** ALEPHA_MAILBOX_HEADER ***/
/*
* vim:ts=3:sw=3:sts=3:sta:et:ft=cpp
*/