From adbbdf940fa6e5bf7eba4cf2ac932a51cbe28381 Mon Sep 17 00:00:00 2001 From: ADAM David Alan Martin Date: Wed, 8 Nov 2023 00:22:06 -0500 Subject: [PATCH] I found the original `Mailbox` implementation from over a decade ago. --- Atomic/Mailbox.h | 193 +++++++++++++++++++++++++++++++++++++++++++++++ 1 file changed, 193 insertions(+) create mode 100644 Atomic/Mailbox.h diff --git a/Atomic/Mailbox.h b/Atomic/Mailbox.h new file mode 100644 index 0000000..93733c5 --- /dev/null +++ b/Atomic/Mailbox.h @@ -0,0 +1,193 @@ +/*! + * @file Mailbox.h + * @brief Class which abstracts a "mailbox" metaphor, for threaded programming + * + * Copyright (C) 2010 Alepha Library. All rights reserved.
+ * @author ADAM David Alan Martin + */ + +#ifndef ALEPHA_MAILBOX_HEADER +#define ALEPHA_MAILBOX_HEADER + +#include + +#include + +#include +#include + +namespace Alepha +{ + namespace Atomic + { + /*! + * @brief The Mailbox class implements a mailbox metaphor. Just like postal service mailboxes, + * the mailbox class lets users add items to the box from a producer thread or group of threads, + * and in a (single, for now) consumer thread, take the entire contents of the mailbox out, and + * work with them. + * + * @tparam Item The Item type which is used in the mailbox containers. + * + * @note Mailboxes, for now, only support one consumer, and any number of producers. + * + * @invariant Mailboxes give out their contents in the order placed in. + * + * The Mailbox primitive is implemented as two "containers" which are swapped periodically by the + * consumer, and continually filled by the producers. This swapping keeps the identity in principle, + * but allows the user to access the contents of the "other side", in practice. The two mailbox + * containers are: "preparation" and "filled". The consumer takes a whole "filled" mailbox out, and + * replaces his current mailbox into the system for use as the new "preparation" box. The producer + * continually inserts mail into the "preparation" mailbox until full. The consumer clears his mailbox + * when returning it. + */ + template< typename Item > + class Mailbox + { + private: + /*! @brief The maximum number of "mail" items which can be placed into a "preparation" mailbox */ + const size_t boxLimit; + + const size_t minSwapFill; + + boost::mutex mailboxAccess; + + boost::condition boxFull; + boost::condition boxEmpty; + + std::vector< Item > preparing; + std::vector< Item > filled; + + bool finished; + bool terminated; + + bool + testTerminated() const + { + if( terminated ) throw Alepha::TerminatedException(); + return true; + } + + bool + testFinished() const + { + if( finished ) throw Alepha::FinishedException(); + return true; + } + + public: + explicit inline + Mailbox( const size_t lim, const size_t min= 1 ) + : boxLimit( lim ), minSwapFill( min ), finished( false ), terminated( false ) + { + this->preparing.reserve( boxLimit ); + this->filled.reserve( boxLimit ); + } + + /*! + * @brief This function will give back the "now-current" preparation mailbox, and check out the current + * mail as the filled mailbox. + * @returns A reference to the current filled mailbox. + * + * @throw ClosureException When a closure event is encountered. (Any ClosureException derived type, see + * the ClosureException schedule for why these events get thrown.) + * + * @pre The mailbox must have some items in it. This is established for you by the internal locking system. + * @post The preparing mailbox is cleared, and the filled mailbox has the new mail. + * @invariant The filled mailbox will have some items in it, or a Alepha::ClosureException will be thrown. + * @invariant This function is not threadsafe in the presence of multiple consumers (fetchers). + */ + std::vector< Item > & + fetchItems() + { + boost::this_thread::interruption_point(); + /* Our mailbox is done, so let's empty it. (Don't hold the lock during this, to reduce contention.) */ + this->filled.clear(); + + ALEPHA_TEMPLATE_NAMED_USAGE( lock, this->mailboxAccess ) + { + /* Make sure that there's mail we can get. */ + while( this->testTerminated() && ( this->preparing.size() < minSwapFill ) ) + //while( ( this->testTerminated() ) && ( this->preparing.empty() ) ) + { + //this->testFinished(); + if( this->finished && !this->preparing.empty() ) + { + break; + } + this->testFinished(); + this->boxFull.wait( lock ); + } + assert( !this->preparing.empty() ); + + /* Exchange our mailbox with the preparation box. And give it back. */ + std::swap( this->filled, this->preparing ); + this->boxEmpty.notify_all(); + } + return this->filled; + } + + /*! + * @brief This function will add mail to the mailbox. + * + * @throw ClosureException When a closure event is encountered. (Any ClosureException derived type, see + * the ClosureException schedule for why these events get thrown.) + * + * @pre The mailbox must have room. This is established for you by the internal locking system. + * @post The preparing mailbox will have "item" as the last item added to this box. + * @invariant The mailbox will not be allowed to become overfull. + */ + void + push_back( const Item &item ) + { + boost::this_thread::interruption_point(); + ALEPHA_TEMPLATE_NAMED_USAGE( lock, this->mailboxAccess ) + { + /* Make sure we can put the mail in */ + while( ( this->testTerminated() ) && ( this->preparing.size() == this->boxLimit ) ) + { + this->boxEmpty.wait( lock ); + } + assert( !this->preparing.size() < this->boxLimit ); + + /* Put the item into the mailbox. */ + this->preparing.push_back( item ); + this->boxFull.notify_one(); + } + } + + /*! + * @brief This function will close the mailbox to any new mail, and deliver a closure + * event to the other side. + * + * @pre The mailbox must not be finished. + * @pre The mailbox will be put into the finished state. + */ + void + close() + { + ALEPHA_TEMPLATE_SCOPED_USAGE( this->mailboxAccess ) + { + this->finished= true; + this->boxFull.notify_all(); + } + } + + void + terminate() + { + ALEPHA_TEMPLATE_SCOPED_USAGE( this->mailboxAccess ) + { + this->terminated= true; + this->boxEmpty.notify_all(); + this->boxFull.notify_all(); + } + } + }; + } +} + +#endif /*** ALEPHA_MAILBOX_HEADER ***/ + +/* + * vim:ts=3:sw=3:sts=3:sta:et:ft=cpp + */