1
0
forked from Alepha/Alepha

Add a new mailbox alternative

This commit is contained in:
2024-01-03 23:57:31 -05:00
parent 8ce3bd8bdf
commit 7253432198
2 changed files with 258 additions and 0 deletions

91
Atomic/DeadDrop.h Normal file
View File

@ -0,0 +1,91 @@
static_assert( __cplusplus > 2020'99 );
#pragma once
#include <Alepha/Alepha.h>
#include <cassert>
#include <queue>
#include <Alepha/Thread.h>
namespace Alepha::Hydrogen::Atomic ::detail:: DeadDrop_m
{
inline namespace exports
{
template< typename Item >
class DeadDrop;
}
template< typename Item >
class exports::DeadDrop
{
private:
Alepha::mutex access;
struct Box
{
private:
Alepha::mutex *access= nullptr;
Alepha::condition_variable ready;
Item original;
Item *active= &original;
std::queue< Item * > next;
Box *companion= nullptr;
friend DeadDrop;
void
setup( Alepha::mutex *const access, Box *const companion )
{
this->access= access;
this->companion= companion;
}
public:
Item &
acquireBox()
{
if( not active )
{
[[unlikely]];
std::unique_lock lock{ *access };
ready.wait( lock, [&]{ return not next.empty(); } );
active= next.front();
next.pop();
}
assert( active );
return *active;
}
void
releaseBox()
{
std::unique_lock lock{ *access };
companion->next.push( active );
active= nullptr;
companion->ready.notify_one();
}
};
public:
Box producer;
Box consumer;
explicit
DeadDrop()
{
producer.setup( &this->access, &consumer );
consumer.setup( &this->access, &producer );
}
};
}
namespace Alepha::Hydrogen::Atomic::inline exports::inline DeadDrop_m
{
using namespace detail::DeadDrop_m::exports;
}

167
Atomic/Dropbox.h Normal file
View File

@ -0,0 +1,167 @@
static_assert( __cplusplus > 2020'99 );
/*!
* @file Dropbox.h
* @brief Class which abstracts a "dropbox" metaphor, for threaded programming
*
* Copyright (C) 2024 Alepha Library. All rights reserved. <BR>
* @author ADAM David Alan Martin
*/
#pragma once
#include <Alepha/Alepha.h>
#include <vector>
#include <exception>
#include <Alepha/Thread.h>
#include <Alepha/Exception.h>
#include <Alepha/assertion.h>
#include <Alepha/error.h>
#include "DeadDrop.h"
namespace Alepha::Hydrogen::Atomic ::detail:: Dropbox_m
{
namespace C
{
const bool debug= false;
const bool debugInterlock= false or C::debug;
const bool debugPop= false or C::debug;
}
inline namespace exports
{
template< typename > class Dropbox;
using DropboxFinishedException= synthetic_exception< struct finished_exception, FinishedException >;
using AnyTaggedDropboxFinishedException= AnyTagged< DropboxFinishedException >;
template< typename tag > using TaggedDropboxFinishedException= Tagged< DropboxFinishedException, tag >;
using DropboxFinishedCondition= synthetic_exception< struct finished_exception, FinishedCondition, DropboxFinishedException >;
using AnyTaggedDropboxFinishedCondition= AnyTagged< DropboxFinishedCondition >;
template< typename tag > using TaggedDropboxFinishedCondition= Tagged< DropboxFinishedCondition, tag >;
}
template< typename T >
std::size_t
computeWeight( const T & )
{
return sizeof( T );
}
/*!
* @brief The Dropbox class implements a Dropbox metaphor. Just like anonymous dead-drop boxes,
* the Dropbox class lets users add items to a box from a single producer thread
* and in a single consumer thread consume the contents of the mailbox.
*
* @tparam Item The Item type which is used in the dropbox containers.
*
* @note Dropboxes only support one consumer and one producer.
*
* @note Dropboxes only block on attempting to acquire the next `DeadDrop` to use after exhaustion.
*
* @invariant Dropboxes give out their contents in the order placed in.
*
* The Dropbox primitive is implemented as DeadDrop object which is rotated periodically.
* When a producer or consumer exhausts the current DeadDrop port, it `release`es it. When
* it needs a new DeadDrop port to actually use, it will `acquire` it. Only `acquire` blocks.
*/
template< typename Item >
class exports::Dropbox
{
public:
using FinishedException= synthetic_exception< struct finished_exception, DropboxFinishedException >;
//using AnyTaggedFinishedException= AnyTagged< FinishedException >;
//template< typename tag > using TaggedFinishedException= Tagged< FinishedException, tag >;
using FinishedCondition= synthetic_exception< struct finished_exception, DropboxFinishedCondition, FinishedException >;
//using AnyTaggedFinishedCondition= AnyTagged< FinishedCondition >;
//template< typename tag > using TaggedFinishedCondition= Tagged< FinishedCondition, tag >;
private:
const std::size_t boxLimit;
std::size_t weight= 0;
struct Box
{
std::vector< Item > items;
std::vector< Item >::iterator pos= begin( items );
std::exception_ptr notification;
};
DeadDrop< Box > drops;
void
sendoff()
{
auto &producer= drops.producer.acquireBox();
producer.pos= begin( producer.items );
weight= 0;
drops.producer.releaseBox();
}
public:
explicit
Dropbox( const size_t lim )
: boxLimit( lim )
{
drops.consumer.releaseBox();
}
Item
pop()
{
auto &filled= drops.consumer.acquireBox();
if( filled.pos == end( filled.items ) )
{
[[unlikely]];
auto notification= std::move( filled.notification );
filled.notification= nullptr;
drops.consumer.releaseBox();
std::rethrow_exception( std::move( notification ) );
}
auto &rv= *filled.pos++;
if( filled.pos == end( filled.items ) and filled.notification == nullptr )
{
drops.consumer.releaseBox();
}
return std::move( rv );
}
bool wouldBlock() { return weight > boxLimit; }
void
push( Item item )
{
if( wouldBlock() ) [[unlikely]] sendoff();
assertion( weight <= boxLimit );
auto &preparing= drops.producer.acquireBox();
weight+= computeWeight( item );
preparing.items.push_back( std::move( item ) );
}
void
notify( auto newNotification )
{
auto &preparing= drops.producer.acquireBox();
preparing.notification= std::make_exception_ptr( std::move( newNotification ) );
sendoff();
}
void
finish()
{
notify( build_exception< FinishedCondition >( "" ) );
}
};
}
namespace Alepha::Hydrogen::Atomic::inline exports::inline Dropbox_m
{
using namespace detail::Dropbox_m::exports;
}