From 72534321980bba56da4c7a70ca8fc9459311a92f Mon Sep 17 00:00:00 2001 From: ADAM David Alan Martin Date: Wed, 3 Jan 2024 23:57:31 -0500 Subject: [PATCH] Add a new mailbox alternative --- Atomic/DeadDrop.h | 91 +++++++++++++++++++++++++ Atomic/Dropbox.h | 167 ++++++++++++++++++++++++++++++++++++++++++++++ 2 files changed, 258 insertions(+) create mode 100644 Atomic/DeadDrop.h create mode 100644 Atomic/Dropbox.h diff --git a/Atomic/DeadDrop.h b/Atomic/DeadDrop.h new file mode 100644 index 0000000..da06d6c --- /dev/null +++ b/Atomic/DeadDrop.h @@ -0,0 +1,91 @@ +static_assert( __cplusplus > 2020'99 ); + +#pragma once + +#include + +#include + +#include + +#include + +namespace Alepha::Hydrogen::Atomic ::detail:: DeadDrop_m +{ + inline namespace exports + { + template< typename Item > + class DeadDrop; + } + + template< typename Item > + class exports::DeadDrop + { + private: + Alepha::mutex access; + + struct Box + { + private: + Alepha::mutex *access= nullptr; + Alepha::condition_variable ready; + + Item original; + + Item *active= &original; + std::queue< Item * > next; + + Box *companion= nullptr; + + friend DeadDrop; + + void + setup( Alepha::mutex *const access, Box *const companion ) + { + this->access= access; + this->companion= companion; + } + + public: + Item & + acquireBox() + { + if( not active ) + { + [[unlikely]]; + std::unique_lock lock{ *access }; + ready.wait( lock, [&]{ return not next.empty(); } ); + active= next.front(); + next.pop(); + } + assert( active ); + return *active; + } + + void + releaseBox() + { + std::unique_lock lock{ *access }; + companion->next.push( active ); + active= nullptr; + companion->ready.notify_one(); + } + }; + + public: + Box producer; + Box consumer; + + explicit + DeadDrop() + { + producer.setup( &this->access, &consumer ); + consumer.setup( &this->access, &producer ); + } + }; +} + +namespace Alepha::Hydrogen::Atomic::inline exports::inline DeadDrop_m +{ + using namespace detail::DeadDrop_m::exports; +} diff --git a/Atomic/Dropbox.h b/Atomic/Dropbox.h new file mode 100644 index 0000000..a93babd --- /dev/null +++ b/Atomic/Dropbox.h @@ -0,0 +1,167 @@ +static_assert( __cplusplus > 2020'99 ); + +/*! + * @file Dropbox.h + * @brief Class which abstracts a "dropbox" metaphor, for threaded programming + * + * Copyright (C) 2024 Alepha Library. All rights reserved.
+ * @author ADAM David Alan Martin + */ + +#pragma once + +#include + +#include +#include + +#include +#include +#include +#include + +#include "DeadDrop.h" + +namespace Alepha::Hydrogen::Atomic ::detail:: Dropbox_m +{ + namespace C + { + const bool debug= false; + const bool debugInterlock= false or C::debug; + const bool debugPop= false or C::debug; + } + + inline namespace exports + { + template< typename > class Dropbox; + + using DropboxFinishedException= synthetic_exception< struct finished_exception, FinishedException >; + using AnyTaggedDropboxFinishedException= AnyTagged< DropboxFinishedException >; + template< typename tag > using TaggedDropboxFinishedException= Tagged< DropboxFinishedException, tag >; + + using DropboxFinishedCondition= synthetic_exception< struct finished_exception, FinishedCondition, DropboxFinishedException >; + using AnyTaggedDropboxFinishedCondition= AnyTagged< DropboxFinishedCondition >; + template< typename tag > using TaggedDropboxFinishedCondition= Tagged< DropboxFinishedCondition, tag >; + } + + template< typename T > + std::size_t + computeWeight( const T & ) + { + return sizeof( T ); + } + + /*! + * @brief The Dropbox class implements a Dropbox metaphor. Just like anonymous dead-drop boxes, + * the Dropbox class lets users add items to a box from a single producer thread + * and in a single consumer thread consume the contents of the mailbox. + * + * @tparam Item The Item type which is used in the dropbox containers. + * + * @note Dropboxes only support one consumer and one producer. + * + * @note Dropboxes only block on attempting to acquire the next `DeadDrop` to use after exhaustion. + * + * @invariant Dropboxes give out their contents in the order placed in. + * + * The Dropbox primitive is implemented as DeadDrop object which is rotated periodically. + * When a producer or consumer exhausts the current DeadDrop port, it `release`es it. When + * it needs a new DeadDrop port to actually use, it will `acquire` it. Only `acquire` blocks. + */ + template< typename Item > + class exports::Dropbox + { + public: + using FinishedException= synthetic_exception< struct finished_exception, DropboxFinishedException >; + //using AnyTaggedFinishedException= AnyTagged< FinishedException >; + //template< typename tag > using TaggedFinishedException= Tagged< FinishedException, tag >; + + using FinishedCondition= synthetic_exception< struct finished_exception, DropboxFinishedCondition, FinishedException >; + //using AnyTaggedFinishedCondition= AnyTagged< FinishedCondition >; + //template< typename tag > using TaggedFinishedCondition= Tagged< FinishedCondition, tag >; + + private: + const std::size_t boxLimit; + std::size_t weight= 0; + + struct Box + { + std::vector< Item > items; + std::vector< Item >::iterator pos= begin( items ); + std::exception_ptr notification; + }; + DeadDrop< Box > drops; + + void + sendoff() + { + auto &producer= drops.producer.acquireBox(); + producer.pos= begin( producer.items ); + weight= 0; + + drops.producer.releaseBox(); + } + + public: + explicit + Dropbox( const size_t lim ) + : boxLimit( lim ) + { + drops.consumer.releaseBox(); + } + + Item + pop() + { + auto &filled= drops.consumer.acquireBox(); + if( filled.pos == end( filled.items ) ) + { + [[unlikely]]; + auto notification= std::move( filled.notification ); + filled.notification= nullptr; + drops.consumer.releaseBox(); + std::rethrow_exception( std::move( notification ) ); + } + auto &rv= *filled.pos++; + if( filled.pos == end( filled.items ) and filled.notification == nullptr ) + { + drops.consumer.releaseBox(); + } + return std::move( rv ); + } + + bool wouldBlock() { return weight > boxLimit; } + + + void + push( Item item ) + { + if( wouldBlock() ) [[unlikely]] sendoff(); + assertion( weight <= boxLimit ); + + auto &preparing= drops.producer.acquireBox(); + + weight+= computeWeight( item ); + preparing.items.push_back( std::move( item ) ); + } + + void + notify( auto newNotification ) + { + auto &preparing= drops.producer.acquireBox(); + preparing.notification= std::make_exception_ptr( std::move( newNotification ) ); + sendoff(); + } + + void + finish() + { + notify( build_exception< FinishedCondition >( "" ) ); + } + }; +} + +namespace Alepha::Hydrogen::Atomic::inline exports::inline Dropbox_m +{ + using namespace detail::Dropbox_m::exports; +}