buffered_channel.hpp 13 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395
  1. // Copyright Oliver Kowalke 2016.
  2. // Distributed under the Boost Software License, Version 1.0.
  3. // (See accompanying file LICENSE_1_0.txt or copy at
  4. // http://www.boost.org/LICENSE_1_0.txt)
  5. //
  6. #ifndef BOOST_FIBERS_BUFFERED_CHANNEL_H
  7. #define BOOST_FIBERS_BUFFERED_CHANNEL_H
  8. #include <atomic>
  9. #include <chrono>
  10. #include <cstddef>
  11. #include <cstdint>
  12. #include <memory>
  13. #include <type_traits>
  14. #include <boost/config.hpp>
  15. #include <boost/fiber/channel_op_status.hpp>
  16. #include <boost/fiber/context.hpp>
  17. #include <boost/fiber/waker.hpp>
  18. #include <boost/fiber/detail/config.hpp>
  19. #include <boost/fiber/detail/convert.hpp>
  20. #include <boost/fiber/detail/spinlock.hpp>
  21. #include <boost/fiber/exceptions.hpp>
  22. #ifdef BOOST_HAS_ABI_HEADERS
  23. # include BOOST_ABI_PREFIX
  24. #endif
  25. namespace boost {
  26. namespace fibers {
  27. template< typename T >
  28. class buffered_channel {
  29. public:
  30. using value_type = typename std::remove_reference<T>::type;
  31. private:
  32. using slot_type = value_type;
  33. mutable detail::spinlock splk_{};
  34. wait_queue waiting_producers_{};
  35. wait_queue waiting_consumers_{};
  36. slot_type * slots_;
  37. std::size_t pidx_{ 0 };
  38. std::size_t cidx_{ 0 };
  39. std::size_t capacity_;
  40. bool closed_{ false };
  41. bool is_full_() const noexcept {
  42. return cidx_ == ((pidx_ + 1) % capacity_);
  43. }
  44. bool is_empty_() const noexcept {
  45. return cidx_ == pidx_;
  46. }
  47. bool is_closed_() const noexcept {
  48. return closed_;
  49. }
  50. public:
  51. explicit buffered_channel( std::size_t capacity) :
  52. capacity_{ capacity } {
  53. if ( BOOST_UNLIKELY( 2 > capacity_ || 0 != ( capacity_ & (capacity_ - 1) ) ) ) {
  54. throw fiber_error{ std::make_error_code( std::errc::invalid_argument),
  55. "boost fiber: buffer capacity is invalid" };
  56. }
  57. slots_ = new slot_type[capacity_];
  58. }
  59. ~buffered_channel() {
  60. close();
  61. delete [] slots_;
  62. }
  63. buffered_channel( buffered_channel const&) = delete;
  64. buffered_channel & operator=( buffered_channel const&) = delete;
  65. bool is_closed() const noexcept {
  66. detail::spinlock_lock lk{ splk_ };
  67. return is_closed_();
  68. }
  69. void close() noexcept {
  70. detail::spinlock_lock lk{ splk_ };
  71. if ( ! closed_) {
  72. closed_ = true;
  73. waiting_producers_.notify_all();
  74. waiting_consumers_.notify_all();
  75. }
  76. }
  77. channel_op_status try_push( value_type const& value) {
  78. detail::spinlock_lock lk{ splk_ };
  79. if ( BOOST_UNLIKELY( is_closed_() ) ) {
  80. return channel_op_status::closed;
  81. }
  82. if ( is_full_() ) {
  83. return channel_op_status::full;
  84. }
  85. slots_[pidx_] = value;
  86. pidx_ = (pidx_ + 1) % capacity_;
  87. waiting_consumers_.notify_one();
  88. return channel_op_status::success;
  89. }
  90. channel_op_status try_push( value_type && value) {
  91. detail::spinlock_lock lk{ splk_ };
  92. if ( BOOST_UNLIKELY( is_closed_() ) ) {
  93. return channel_op_status::closed;
  94. }
  95. if ( is_full_() ) {
  96. return channel_op_status::full;
  97. }
  98. slots_[pidx_] = std::move( value);
  99. pidx_ = (pidx_ + 1) % capacity_;
  100. waiting_consumers_.notify_one();
  101. return channel_op_status::success;
  102. }
  103. channel_op_status push( value_type const& value) {
  104. context * active_ctx = context::active();
  105. for (;;) {
  106. detail::spinlock_lock lk{ splk_ };
  107. if ( BOOST_UNLIKELY( is_closed_() ) ) {
  108. return channel_op_status::closed;
  109. }
  110. if ( is_full_() ) {
  111. waiting_producers_.suspend_and_wait( lk, active_ctx);
  112. } else {
  113. slots_[pidx_] = value;
  114. pidx_ = (pidx_ + 1) % capacity_;
  115. waiting_consumers_.notify_one();
  116. return channel_op_status::success;
  117. }
  118. }
  119. }
  120. channel_op_status push( value_type && value) {
  121. context * active_ctx = context::active();
  122. for (;;) {
  123. detail::spinlock_lock lk{ splk_ };
  124. if ( BOOST_UNLIKELY( is_closed_() ) ) {
  125. return channel_op_status::closed;
  126. }
  127. if ( is_full_() ) {
  128. waiting_producers_.suspend_and_wait( lk, active_ctx);
  129. } else {
  130. slots_[pidx_] = std::move( value);
  131. pidx_ = (pidx_ + 1) % capacity_;
  132. waiting_consumers_.notify_one();
  133. return channel_op_status::success;
  134. }
  135. }
  136. }
  137. template< typename Rep, typename Period >
  138. channel_op_status push_wait_for( value_type const& value,
  139. std::chrono::duration< Rep, Period > const& timeout_duration) {
  140. return push_wait_until( value,
  141. std::chrono::steady_clock::now() + timeout_duration);
  142. }
  143. template< typename Rep, typename Period >
  144. channel_op_status push_wait_for( value_type && value,
  145. std::chrono::duration< Rep, Period > const& timeout_duration) {
  146. return push_wait_until( std::forward< value_type >( value),
  147. std::chrono::steady_clock::now() + timeout_duration);
  148. }
  149. template< typename Clock, typename Duration >
  150. channel_op_status push_wait_until( value_type const& value,
  151. std::chrono::time_point< Clock, Duration > const& timeout_time_) {
  152. context * active_ctx = context::active();
  153. std::chrono::steady_clock::time_point timeout_time = detail::convert( timeout_time_);
  154. for (;;) {
  155. detail::spinlock_lock lk{ splk_ };
  156. if ( BOOST_UNLIKELY( is_closed_() ) ) {
  157. return channel_op_status::closed;
  158. }
  159. if ( is_full_() ) {
  160. if ( ! waiting_producers_.suspend_and_wait_until( lk, active_ctx, timeout_time)) {
  161. return channel_op_status::timeout;
  162. }
  163. } else {
  164. slots_[pidx_] = value;
  165. pidx_ = (pidx_ + 1) % capacity_;
  166. waiting_consumers_.notify_one();
  167. return channel_op_status::success;
  168. }
  169. }
  170. }
  171. template< typename Clock, typename Duration >
  172. channel_op_status push_wait_until( value_type && value,
  173. std::chrono::time_point< Clock, Duration > const& timeout_time_) {
  174. context * active_ctx = context::active();
  175. std::chrono::steady_clock::time_point timeout_time = detail::convert( timeout_time_);
  176. for (;;) {
  177. detail::spinlock_lock lk{ splk_ };
  178. if ( BOOST_UNLIKELY( is_closed_() ) ) {
  179. return channel_op_status::closed;
  180. }
  181. if ( is_full_() ) {
  182. if ( ! waiting_producers_.suspend_and_wait_until( lk, active_ctx, timeout_time)) {
  183. return channel_op_status::timeout;
  184. }
  185. } else {
  186. slots_[pidx_] = std::move( value);
  187. pidx_ = (pidx_ + 1) % capacity_;
  188. // notify one waiting consumer
  189. waiting_consumers_.notify_one();
  190. return channel_op_status::success;
  191. }
  192. }
  193. }
  194. channel_op_status try_pop( value_type & value) {
  195. detail::spinlock_lock lk{ splk_ };
  196. if ( is_empty_() ) {
  197. return is_closed_()
  198. ? channel_op_status::closed
  199. : channel_op_status::empty;
  200. }
  201. value = std::move( slots_[cidx_]);
  202. cidx_ = (cidx_ + 1) % capacity_;
  203. waiting_producers_.notify_one();
  204. return channel_op_status::success;
  205. }
  206. channel_op_status pop( value_type & value) {
  207. context * active_ctx = context::active();
  208. for (;;) {
  209. detail::spinlock_lock lk{ splk_ };
  210. if ( is_empty_() ) {
  211. if ( BOOST_UNLIKELY( is_closed_() ) ) {
  212. return channel_op_status::closed;
  213. }
  214. waiting_consumers_.suspend_and_wait( lk, active_ctx);
  215. } else {
  216. value = std::move( slots_[cidx_]);
  217. cidx_ = (cidx_ + 1) % capacity_;
  218. waiting_producers_.notify_one();
  219. return channel_op_status::success;
  220. }
  221. }
  222. }
  223. value_type value_pop() {
  224. context * active_ctx = context::active();
  225. for (;;) {
  226. detail::spinlock_lock lk{ splk_ };
  227. if ( is_empty_() ) {
  228. if ( BOOST_UNLIKELY( is_closed_() ) ) {
  229. throw fiber_error{
  230. std::make_error_code( std::errc::operation_not_permitted),
  231. "boost fiber: channel is closed" };
  232. }
  233. waiting_consumers_.suspend_and_wait( lk, active_ctx);
  234. } else {
  235. value_type value = std::move( slots_[cidx_]);
  236. cidx_ = (cidx_ + 1) % capacity_;
  237. waiting_producers_.notify_one();
  238. return value;
  239. }
  240. }
  241. }
  242. template< typename Rep, typename Period >
  243. channel_op_status pop_wait_for( value_type & value,
  244. std::chrono::duration< Rep, Period > const& timeout_duration) {
  245. return pop_wait_until( value,
  246. std::chrono::steady_clock::now() + timeout_duration);
  247. }
  248. template< typename Clock, typename Duration >
  249. channel_op_status pop_wait_until( value_type & value,
  250. std::chrono::time_point< Clock, Duration > const& timeout_time_) {
  251. context * active_ctx = context::active();
  252. std::chrono::steady_clock::time_point timeout_time = detail::convert( timeout_time_);
  253. for (;;) {
  254. detail::spinlock_lock lk{ splk_ };
  255. if ( is_empty_() ) {
  256. if ( BOOST_UNLIKELY( is_closed_() ) ) {
  257. return channel_op_status::closed;
  258. }
  259. if ( ! waiting_consumers_.suspend_and_wait_until( lk, active_ctx, timeout_time)) {
  260. return channel_op_status::timeout;
  261. }
  262. } else {
  263. value = std::move( slots_[cidx_]);
  264. cidx_ = (cidx_ + 1) % capacity_;
  265. waiting_producers_.notify_one();
  266. return channel_op_status::success;
  267. }
  268. }
  269. }
  270. class iterator {
  271. private:
  272. typedef typename std::aligned_storage< sizeof( value_type), alignof( value_type) >::type storage_type;
  273. buffered_channel * chan_{ nullptr };
  274. storage_type storage_;
  275. void increment_( bool initial = false) {
  276. BOOST_ASSERT( nullptr != chan_);
  277. try {
  278. if ( ! initial) {
  279. reinterpret_cast< value_type * >( std::addressof( storage_) )->~value_type();
  280. }
  281. ::new ( static_cast< void * >( std::addressof( storage_) ) ) value_type{ chan_->value_pop() };
  282. } catch ( fiber_error const&) {
  283. chan_ = nullptr;
  284. }
  285. }
  286. public:
  287. using iterator_category = std::input_iterator_tag;
  288. using difference_type = std::ptrdiff_t;
  289. using pointer = value_type *;
  290. using reference = value_type &;
  291. using pointer_t = pointer;
  292. using reference_t = reference;
  293. iterator() noexcept = default;
  294. explicit iterator( buffered_channel< T > * chan) noexcept :
  295. chan_{ chan } {
  296. increment_( true);
  297. }
  298. iterator( iterator const& other) noexcept :
  299. chan_{ other.chan_ } {
  300. }
  301. iterator & operator=( iterator const& other) noexcept {
  302. if ( BOOST_LIKELY( this != & other) ) {
  303. chan_ = other.chan_;
  304. }
  305. return * this;
  306. }
  307. bool operator==( iterator const& other) const noexcept {
  308. return other.chan_ == chan_;
  309. }
  310. bool operator!=( iterator const& other) const noexcept {
  311. return other.chan_ != chan_;
  312. }
  313. iterator & operator++() {
  314. reinterpret_cast< value_type * >( std::addressof( storage_) )->~value_type();
  315. increment_();
  316. return * this;
  317. }
  318. const iterator operator++( int) = delete;
  319. reference_t operator*() noexcept {
  320. return * reinterpret_cast< value_type * >( std::addressof( storage_) );
  321. }
  322. pointer_t operator->() noexcept {
  323. return reinterpret_cast< value_type * >( std::addressof( storage_) );
  324. }
  325. };
  326. friend class iterator;
  327. };
  328. template< typename T >
  329. typename buffered_channel< T >::iterator
  330. begin( buffered_channel< T > & chan) {
  331. return typename buffered_channel< T >::iterator( & chan);
  332. }
  333. template< typename T >
  334. typename buffered_channel< T >::iterator
  335. end( buffered_channel< T > &) {
  336. return typename buffered_channel< T >::iterator();
  337. }
  338. }}
  339. #ifdef BOOST_HAS_ABI_HEADERS
  340. # include BOOST_ABI_SUFFIX
  341. #endif
  342. #endif // BOOST_FIBERS_BUFFERED_CHANNEL_H