swap_queue.h 9.4 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249
  1. /*
  2. * Copyright (c) 2015 The WebRTC project authors. All Rights Reserved.
  3. *
  4. * Use of this source code is governed by a BSD-style license
  5. * that can be found in the LICENSE file in the root of the source
  6. * tree. An additional intellectual property rights grant can be found
  7. * in the file PATENTS. All contributing project authors may
  8. * be found in the AUTHORS file in the root of the source tree.
  9. */
  10. #ifndef RTC_BASE_SWAP_QUEUE_H_
  11. #define RTC_BASE_SWAP_QUEUE_H_
  12. #include <stddef.h>
  13. #include <atomic>
  14. #include <utility>
  15. #include <vector>
  16. #include "rtc_base/checks.h"
  17. #include "rtc_base/system/unused.h"
  18. namespace webrtc {
  19. namespace internal {
  20. // (Internal; please don't use outside this file.)
  21. template <typename T>
  22. bool NoopSwapQueueItemVerifierFunction(const T&) {
  23. return true;
  24. }
  25. } // namespace internal
  26. // Functor to use when supplying a verifier function for the queue.
  27. template <typename T,
  28. bool (*QueueItemVerifierFunction)(const T&) =
  29. internal::NoopSwapQueueItemVerifierFunction>
  30. class SwapQueueItemVerifier {
  31. public:
  32. bool operator()(const T& t) const { return QueueItemVerifierFunction(t); }
  33. };
  34. // This class is a fixed-size queue. A single producer calls Insert() to insert
  35. // an element of type T at the back of the queue, and a single consumer calls
  36. // Remove() to remove an element from the front of the queue. It's safe for the
  37. // producer and the consumer to access the queue concurrently, from different
  38. // threads.
  39. //
  40. // To avoid the construction, copying, and destruction of Ts that a naive
  41. // queue implementation would require, for each "full" T passed from
  42. // producer to consumer, SwapQueue<T> passes an "empty" T in the other
  43. // direction (an "empty" T is one that contains nothing of value for the
  44. // consumer). This bidirectional movement is implemented with swap().
  45. //
  46. // // Create queue:
  47. // Bottle proto(568); // Prepare an empty Bottle. Heap allocates space for
  48. // // 568 ml.
  49. // SwapQueue<Bottle> q(N, proto); // Init queue with N copies of proto.
  50. // // Each copy allocates on the heap.
  51. // // Producer pseudo-code:
  52. // Bottle b(568); // Prepare an empty Bottle. Heap allocates space for 568 ml.
  53. // loop {
  54. // b.Fill(amount); // Where amount <= 568 ml.
  55. // q.Insert(&b); // Swap our full Bottle for an empty one from q.
  56. // }
  57. //
  58. // // Consumer pseudo-code:
  59. // Bottle b(568); // Prepare an empty Bottle. Heap allocates space for 568 ml.
  60. // loop {
  61. // q.Remove(&b); // Swap our empty Bottle for the next-in-line full Bottle.
  62. // Drink(&b);
  63. // }
  64. //
  65. // For a well-behaved Bottle class, there are no allocations in the
  66. // producer, since it just fills an empty Bottle that's already large
  67. // enough; no deallocations in the consumer, since it returns each empty
  68. // Bottle to the queue after having drunk it; and no copies along the
  69. // way, since the queue uses swap() everywhere to move full Bottles in
  70. // one direction and empty ones in the other.
  71. template <typename T, typename QueueItemVerifier = SwapQueueItemVerifier<T>>
  72. class SwapQueue {
  73. public:
  74. // Creates a queue of size size and fills it with default constructed Ts.
  75. explicit SwapQueue(size_t size) : queue_(size) {
  76. RTC_DCHECK(VerifyQueueSlots());
  77. }
  78. // Same as above and accepts an item verification functor.
  79. SwapQueue(size_t size, const QueueItemVerifier& queue_item_verifier)
  80. : queue_item_verifier_(queue_item_verifier), queue_(size) {
  81. RTC_DCHECK(VerifyQueueSlots());
  82. }
  83. // Creates a queue of size size and fills it with copies of prototype.
  84. SwapQueue(size_t size, const T& prototype) : queue_(size, prototype) {
  85. RTC_DCHECK(VerifyQueueSlots());
  86. }
  87. // Same as above and accepts an item verification functor.
  88. SwapQueue(size_t size,
  89. const T& prototype,
  90. const QueueItemVerifier& queue_item_verifier)
  91. : queue_item_verifier_(queue_item_verifier), queue_(size, prototype) {
  92. RTC_DCHECK(VerifyQueueSlots());
  93. }
  94. // Resets the queue to have zero content while maintaining the queue size.
  95. // Just like Remove(), this can only be called (safely) from the
  96. // consumer.
  97. void Clear() {
  98. // Drop all non-empty elements by resetting num_elements_ and incrementing
  99. // next_read_index_ by the previous value of num_elements_. Relaxed memory
  100. // ordering is sufficient since the dropped elements are not accessed.
  101. next_read_index_ += std::atomic_exchange_explicit(
  102. &num_elements_, size_t{0}, std::memory_order_relaxed);
  103. if (next_read_index_ >= queue_.size()) {
  104. next_read_index_ -= queue_.size();
  105. }
  106. RTC_DCHECK_LT(next_read_index_, queue_.size());
  107. }
  108. // Inserts a "full" T at the back of the queue by swapping *input with an
  109. // "empty" T from the queue.
  110. // Returns true if the item was inserted or false if not (the queue was full).
  111. // When specified, the T given in *input must pass the ItemVerifier() test.
  112. // The contents of *input after the call are then also guaranteed to pass the
  113. // ItemVerifier() test.
  114. bool Insert(T* input) RTC_WARN_UNUSED_RESULT {
  115. RTC_DCHECK(input);
  116. RTC_DCHECK(queue_item_verifier_(*input));
  117. // Load the value of num_elements_. Acquire memory ordering prevents reads
  118. // and writes to queue_[next_write_index_] to be reordered to before the
  119. // load. (That element might be accessed by a concurrent call to Remove()
  120. // until the load finishes.)
  121. if (std::atomic_load_explicit(&num_elements_, std::memory_order_acquire) ==
  122. queue_.size()) {
  123. return false;
  124. }
  125. using std::swap;
  126. swap(*input, queue_[next_write_index_]);
  127. // Increment the value of num_elements_ to account for the inserted element.
  128. // Release memory ordering prevents the reads and writes to
  129. // queue_[next_write_index_] to be reordered to after the increment. (Once
  130. // the increment has finished, Remove() might start accessing that element.)
  131. const size_t old_num_elements = std::atomic_fetch_add_explicit(
  132. &num_elements_, size_t{1}, std::memory_order_release);
  133. ++next_write_index_;
  134. if (next_write_index_ == queue_.size()) {
  135. next_write_index_ = 0;
  136. }
  137. RTC_DCHECK_LT(next_write_index_, queue_.size());
  138. RTC_DCHECK_LT(old_num_elements, queue_.size());
  139. return true;
  140. }
  141. // Removes the frontmost "full" T from the queue by swapping it with
  142. // the "empty" T in *output.
  143. // Returns true if an item could be removed or false if not (the queue was
  144. // empty). When specified, The T given in *output must pass the ItemVerifier()
  145. // test and the contents of *output after the call are then also guaranteed to
  146. // pass the ItemVerifier() test.
  147. bool Remove(T* output) RTC_WARN_UNUSED_RESULT {
  148. RTC_DCHECK(output);
  149. RTC_DCHECK(queue_item_verifier_(*output));
  150. // Load the value of num_elements_. Acquire memory ordering prevents reads
  151. // and writes to queue_[next_read_index_] to be reordered to before the
  152. // load. (That element might be accessed by a concurrent call to Insert()
  153. // until the load finishes.)
  154. if (std::atomic_load_explicit(&num_elements_, std::memory_order_acquire) ==
  155. 0) {
  156. return false;
  157. }
  158. using std::swap;
  159. swap(*output, queue_[next_read_index_]);
  160. // Decrement the value of num_elements_ to account for the removed element.
  161. // Release memory ordering prevents the reads and writes to
  162. // queue_[next_write_index_] to be reordered to after the decrement. (Once
  163. // the decrement has finished, Insert() might start accessing that element.)
  164. std::atomic_fetch_sub_explicit(&num_elements_, size_t{1},
  165. std::memory_order_release);
  166. ++next_read_index_;
  167. if (next_read_index_ == queue_.size()) {
  168. next_read_index_ = 0;
  169. }
  170. RTC_DCHECK_LT(next_read_index_, queue_.size());
  171. return true;
  172. }
  173. // Returns the current number of elements in the queue. Since elements may be
  174. // concurrently added to the queue, the caller must treat this as a lower
  175. // bound, not an exact count.
  176. // May only be called by the consumer.
  177. size_t SizeAtLeast() const {
  178. // Acquire memory ordering ensures that we wait for the producer to finish
  179. // inserting any element in progress.
  180. return std::atomic_load_explicit(&num_elements_, std::memory_order_acquire);
  181. }
  182. private:
  183. // Verify that the queue slots complies with the ItemVerifier test. This
  184. // function is not thread-safe and can only be used in the constructors.
  185. bool VerifyQueueSlots() {
  186. for (const auto& v : queue_) {
  187. RTC_DCHECK(queue_item_verifier_(v));
  188. }
  189. return true;
  190. }
  191. // TODO(peah): Change this to use std::function() once we can use C++11 std
  192. // lib.
  193. QueueItemVerifier queue_item_verifier_;
  194. // Only accessed by the single producer.
  195. size_t next_write_index_ = 0;
  196. // Only accessed by the single consumer.
  197. size_t next_read_index_ = 0;
  198. // Accessed by both the producer and the consumer and used for synchronization
  199. // between them.
  200. std::atomic<size_t> num_elements_{0};
  201. // The elements of the queue are acced by both the producer and the consumer,
  202. // mediated by num_elements_. queue_.size() is constant.
  203. std::vector<T> queue_;
  204. SwapQueue(const SwapQueue&) = delete;
  205. SwapQueue& operator=(const SwapQueue&) = delete;
  206. };
  207. } // namespace webrtc
  208. #endif // RTC_BASE_SWAP_QUEUE_H_