forked from Alepha/Alepha
254 lines
6.9 KiB
C++
254 lines
6.9 KiB
C++
static_assert( __cplusplus > 2020'99 );
|
|
|
|
/*!
|
|
* @file Dropbox.h
|
|
* @brief Class which abstracts a "dropbox" metaphor, for threaded programming
|
|
*
|
|
* Copyright (C) 2024 Alepha Library. All rights reserved. <BR>
|
|
* @author ADAM David Alan Martin
|
|
*/
|
|
|
|
#pragma once
|
|
|
|
#include <Alepha/Alepha.h>
|
|
|
|
#include <vector>
|
|
#include <exception>
|
|
|
|
#include <Alepha/Thread.h>
|
|
#include <Alepha/Exception.h>
|
|
#include <Alepha/assertion.h>
|
|
#include <Alepha/error.h>
|
|
|
|
#include "DeadDrop.h"
|
|
|
|
namespace Alepha::Hydrogen::Atomic ::detail:: Dropbox_m
|
|
{
|
|
namespace C
|
|
{
|
|
const bool debug= false;
|
|
const bool debugInterlock= false or C::debug;
|
|
const bool debugPop= false or C::debug;
|
|
const bool debugSSO= false or C::debug;
|
|
}
|
|
|
|
inline bool
|
|
isSSO( const std::string &s )
|
|
{
|
|
const auto base= reinterpret_cast< const char * >( &s );
|
|
|
|
if constexpr( C::debugSSO )
|
|
{
|
|
error() << "Base is: " << (void *) base << std::endl;
|
|
error() << "Data is: " << (void *) s.data() << std::endl;
|
|
error() << "Storage base is: " << sizeof( s ) << std::endl;
|
|
}
|
|
|
|
const bool result= std::less_equal{}( base, s.data() )
|
|
and std::less{}( s.data(), base + sizeof( s ) );
|
|
|
|
if( C::debugSSO and result ) error() << "Found an SSO" << std::endl;
|
|
return result;
|
|
}
|
|
|
|
inline namespace exports
|
|
{
|
|
template< typename > class Dropbox;
|
|
|
|
using DropboxFinishedException= synthetic_exception< struct finished_exception, FinishedException >;
|
|
using AnyTaggedDropboxFinishedException= AnyTagged< DropboxFinishedException >;
|
|
template< typename tag > using TaggedDropboxFinishedException= Tagged< DropboxFinishedException, tag >;
|
|
|
|
using DropboxFinishedCondition= synthetic_exception< struct finished_exception, FinishedCondition, DropboxFinishedException >;
|
|
using AnyTaggedDropboxFinishedCondition= AnyTagged< DropboxFinishedCondition >;
|
|
template< typename tag > using TaggedDropboxFinishedCondition= Tagged< DropboxFinishedCondition, tag >;
|
|
|
|
|
|
// TODO: Move these to somewhere more sensible...
|
|
template< typename T >
|
|
std::size_t
|
|
computeWeight( const T & )
|
|
{
|
|
return sizeof( T );
|
|
}
|
|
|
|
inline std::size_t
|
|
computeWeight( const std::string &s )
|
|
{
|
|
if( isSSO( s ) ) return sizeof( s );
|
|
return sizeof( s ) + s.capacity();
|
|
}
|
|
|
|
template< typename T >
|
|
std::size_t
|
|
computeWeight( const std::vector< T > &v )
|
|
{
|
|
std::size_t sz= sizeof( v ) + sizeof( T ) * ( v.capacity() - v.size() );
|
|
|
|
for( const auto &i: v )
|
|
{
|
|
sz+= computeWeight( sz );
|
|
}
|
|
|
|
return sz;
|
|
}
|
|
|
|
std::size_t
|
|
computeWeight( const Concepts::Aggregate auto &agg )
|
|
{
|
|
std::size_t rv= 0;
|
|
|
|
// This won't account for slack space... but I have to separate out
|
|
// the `sizeof` checks from the way `computeWeight` works. It's
|
|
// "good enough" for now...
|
|
template_for( agg ) <=[&]( const auto &element )
|
|
{
|
|
rv+= computeWeight( element );
|
|
};
|
|
|
|
return rv;
|
|
}
|
|
}
|
|
|
|
/*!
|
|
* @brief The Dropbox class implements a Dropbox metaphor. Just like anonymous dead-drop boxes,
|
|
* the Dropbox class lets users add items to a box from a single producer thread
|
|
* and in a single consumer thread consume the contents of the mailbox.
|
|
*
|
|
* @tparam Item The Item type which is used in the dropbox containers.
|
|
*
|
|
* @note Dropboxes only support one consumer and one producer.
|
|
*
|
|
* @note Dropboxes only block on attempting to acquire the next `DeadDrop` to use after exhaustion.
|
|
*
|
|
* @invariant Dropboxes give out their contents in the order placed in.
|
|
*
|
|
* The Dropbox primitive is implemented as DeadDrop object which is rotated periodically.
|
|
* When a producer or consumer exhausts the current DeadDrop port, it `release`es it. When
|
|
* it needs a new DeadDrop port to actually use, it will `acquire` it. Only `acquire` blocks.
|
|
*/
|
|
template< typename Item >
|
|
class exports::Dropbox
|
|
{
|
|
public:
|
|
using FinishedException= synthetic_exception< struct finished_exception, DropboxFinishedException >;
|
|
//using AnyTaggedFinishedException= AnyTagged< FinishedException >;
|
|
//template< typename tag > using TaggedFinishedException= Tagged< FinishedException, tag >;
|
|
|
|
using FinishedCondition= synthetic_exception< struct finished_exception, DropboxFinishedCondition, FinishedException >;
|
|
//using AnyTaggedFinishedCondition= AnyTagged< FinishedCondition >;
|
|
//template< typename tag > using TaggedFinishedCondition= Tagged< FinishedCondition, tag >;
|
|
|
|
private:
|
|
const std::size_t boxLimit;
|
|
std::size_t weight= 0;
|
|
|
|
struct Box
|
|
{
|
|
std::vector< Item > items;
|
|
std::vector< Item >::iterator pos= begin( items );
|
|
std::exception_ptr notification;
|
|
};
|
|
DeadDrop< Box > drops;
|
|
|
|
void
|
|
sendoff()
|
|
{
|
|
auto &producer= drops.producer.acquireBox();
|
|
producer.pos= begin( producer.items );
|
|
weight= 0;
|
|
|
|
drops.producer.releaseBox();
|
|
}
|
|
|
|
void
|
|
recycle()
|
|
{
|
|
auto &filled= drops.consumer.acquireBox();
|
|
filled.notification= nullptr;
|
|
filled.items.clear();
|
|
drops.consumer.releaseBox();
|
|
}
|
|
|
|
public:
|
|
explicit
|
|
Dropbox( const size_t lim )
|
|
: boxLimit( lim )
|
|
{
|
|
recycle();
|
|
}
|
|
|
|
Item
|
|
pop()
|
|
{
|
|
auto &filled= drops.consumer.acquireBox();
|
|
if( filled.pos == end( filled.items ) )
|
|
{
|
|
[[unlikely]];
|
|
auto notification= std::move( filled.notification );
|
|
recycle();
|
|
std::rethrow_exception( std::move( notification ) );
|
|
}
|
|
assert( not filled.items.empty() );
|
|
auto &rv= *filled.pos++;
|
|
// If it was the last item and we're going to rotate/reccyle,
|
|
// be sure that we don't reference the now-stale value.
|
|
// This is a special case that will only come up at the end of each queue
|
|
// cycle.
|
|
//
|
|
// I need to research if NRV applies in this circumstance -- if so, then the
|
|
// `rv` variable can be an instance instead of a reference.
|
|
if( filled.pos == end( filled.items ) and filled.notification == nullptr )
|
|
{
|
|
[[unlikely]];
|
|
auto rvTemp= std::move( rv );
|
|
recycle();
|
|
return rvTemp;
|
|
}
|
|
return std::move( rv );
|
|
}
|
|
|
|
bool pushWouldBlock() const noexcept { return weight > boxLimit; }
|
|
|
|
|
|
void
|
|
push( Item item )
|
|
{
|
|
if( pushWouldBlock() ) [[unlikely]] sendoff();
|
|
assertion( weight <= boxLimit );
|
|
|
|
auto &preparing= drops.producer.acquireBox();
|
|
|
|
weight+= computeWeight( item );
|
|
preparing.items.push_back( std::move( item ) );
|
|
}
|
|
|
|
void
|
|
notify( auto newNotification )
|
|
{
|
|
auto &preparing= drops.producer.acquireBox();
|
|
preparing.notification= std::make_exception_ptr( std::move( newNotification ) );
|
|
sendoff();
|
|
}
|
|
|
|
void
|
|
finish( auto newNotification )
|
|
{
|
|
auto &preparing= drops.producer.acquireBox();
|
|
preparing.notification= std::make_exception_ptr( std::move( newNotification ) );
|
|
sendoff();
|
|
}
|
|
|
|
void
|
|
finish()
|
|
{
|
|
finish( build_exception< FinishedCondition >( "" ) );
|
|
}
|
|
};
|
|
}
|
|
|
|
namespace Alepha::Hydrogen::Atomic::inline exports::inline Dropbox_m
|
|
{
|
|
using namespace detail::Dropbox_m::exports;
|
|
}
|