123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197 |
- // Copyright Oliver Kowalke 2013.
- // Distributed under the Boost Software License, Version 1.0.
- // (See accompanying file LICENSE_1_0.txt or copy at
- // http://www.boost.org/LICENSE_1_0.txt)
- #ifndef BOOST_FIBERS_DETAIL_CONTEXT_SPMC_QUEUE_H
- #define BOOST_FIBERS_DETAIL_CONTEXT_SPMC_QUEUE_H
- #include <atomic>
- #include <cstddef>
- #include <cstdint>
- #include <memory>
- #include <type_traits>
- #include <utility>
- #include <boost/assert.hpp>
- #include <boost/config.hpp>
- #include <boost/fiber/detail/config.hpp>
- #include <boost/fiber/context.hpp>
- // David Chase and Yossi Lev. Dynamic circular work-stealing deque.
- // In SPAA ’05: Proceedings of the seventeenth annual ACM symposium
- // on Parallelism in algorithms and architectures, pages 21–28,
- // New York, NY, USA, 2005. ACM.
- //
- // Nhat Minh Lê, Antoniu Pop, Albert Cohen, and Francesco Zappa Nardelli. 2013.
- // Correct and efficient work-stealing for weak memory models.
- // In Proceedings of the 18th ACM SIGPLAN symposium on Principles and practice
- // of parallel programming (PPoPP '13). ACM, New York, NY, USA, 69-80.
- #if BOOST_COMP_CLANG
- #pragma clang diagnostic push
- #pragma clang diagnostic ignored "-Wunused-private-field"
- #endif
- namespace boost {
- namespace fibers {
- namespace detail {
- class context_spmc_queue {
- private:
- class array {
- private:
- typedef std::atomic< context * > atomic_type;
- typedef atomic_type storage_type;
- std::size_t capacity_;
- storage_type * storage_;
- public:
- array( std::size_t capacity) :
- capacity_{ capacity },
- storage_{ new storage_type[capacity_] } {
- for ( std::size_t i = 0; i < capacity_; ++i) {
- ::new ( static_cast< void * >( std::addressof( storage_[i]) ) ) atomic_type{ nullptr };
- }
- }
- ~array() {
- for ( std::size_t i = 0; i < capacity_; ++i) {
- reinterpret_cast< atomic_type * >( std::addressof( storage_[i]) )->~atomic_type();
- }
- delete [] storage_;
- }
- std::size_t capacity() const noexcept {
- return capacity_;
- }
- void push( std::size_t bottom, context * ctx) noexcept {
- reinterpret_cast< atomic_type * >(
- std::addressof( storage_[bottom % capacity_]) )
- ->store( ctx, std::memory_order_relaxed);
- }
- context * pop( std::size_t top) noexcept {
- return reinterpret_cast< atomic_type * >(
- std::addressof( storage_[top % capacity_]) )
- ->load( std::memory_order_relaxed);
- }
- array * resize( std::size_t bottom, std::size_t top) {
- std::unique_ptr< array > tmp{ new array{ 2 * capacity_ } };
- for ( std::size_t i = top; i != bottom; ++i) {
- tmp->push( i, pop( i) );
- }
- return tmp.release();
- }
- };
- std::atomic< std::size_t > top_{ 0 };
- std::atomic< std::size_t > bottom_{ 0 };
- std::atomic< array * > array_;
- std::vector< array * > old_arrays_{};
- char padding_[cacheline_length];
- public:
- context_spmc_queue( std::size_t capacity = 4096) :
- array_{ new array{ capacity } } {
- old_arrays_.reserve( 32);
- }
- ~context_spmc_queue() {
- for ( array * a : old_arrays_) {
- delete a;
- }
- delete array_.load();
- }
- context_spmc_queue( context_spmc_queue const&) = delete;
- context_spmc_queue & operator=( context_spmc_queue const&) = delete;
- bool empty() const noexcept {
- std::size_t bottom = bottom_.load( std::memory_order_relaxed);
- std::size_t top = top_.load( std::memory_order_relaxed);
- return bottom <= top;
- }
- void push( context * ctx) {
- std::size_t bottom = bottom_.load( std::memory_order_relaxed);
- std::size_t top = top_.load( std::memory_order_acquire);
- array * a = array_.load( std::memory_order_relaxed);
- if ( (a->capacity() - 1) < (bottom - top) ) {
- // queue is full
- // resize
- array * tmp = a->resize( bottom, top);
- old_arrays_.push_back( a);
- std::swap( a, tmp);
- array_.store( a, std::memory_order_relaxed);
- }
- a->push( bottom, ctx);
- std::atomic_thread_fence( std::memory_order_release);
- bottom_.store( bottom + 1, std::memory_order_relaxed);
- }
- context * pop() {
- std::size_t bottom = bottom_.load( std::memory_order_relaxed) - 1;
- array * a = array_.load( std::memory_order_relaxed);
- bottom_.store( bottom, std::memory_order_relaxed);
- std::atomic_thread_fence( std::memory_order_seq_cst);
- std::size_t top = top_.load( std::memory_order_relaxed);
- context * ctx = nullptr;
- if ( top <= bottom) {
- // queue is not empty
- ctx = a->pop( bottom);
- BOOST_ASSERT( nullptr != ctx);
- if ( top == bottom) {
- // last element dequeued
- if ( ! top_.compare_exchange_strong( top, top + 1,
- std::memory_order_seq_cst,
- std::memory_order_relaxed) ) {
- // lose the race
- ctx = nullptr;
- }
- bottom_.store( bottom + 1, std::memory_order_relaxed);
- }
- } else {
- // queue is empty
- bottom_.store( bottom + 1, std::memory_order_relaxed);
- }
- return ctx;
- }
- context * steal() {
- std::size_t top = top_.load( std::memory_order_acquire);
- std::atomic_thread_fence( std::memory_order_seq_cst);
- std::size_t bottom = bottom_.load( std::memory_order_acquire);
- context * ctx = nullptr;
- if ( top < bottom) {
- // queue is not empty
- array * a = array_.load( std::memory_order_consume);
- ctx = a->pop( top);
- BOOST_ASSERT( nullptr != ctx);
- // do not steal pinned context (e.g. main-/dispatcher-context)
- if ( ctx->is_context( type::pinned_context) ) {
- return nullptr;
- }
- if ( ! top_.compare_exchange_strong( top, top + 1,
- std::memory_order_seq_cst,
- std::memory_order_relaxed) ) {
- // lose the race
- return nullptr;
- }
- }
- return ctx;
- }
- };
- }}}
- #if BOOST_COMP_CLANG
- #pragma clang diagnostic pop
- #endif
- #endif // BOOST_FIBERS_DETAIL_CONTEXT_SPMC_QUEUE_H
|