concurrent_queue.h 5.4 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156
  1. // Ceres Solver - A fast non-linear least squares minimizer
  2. // Copyright 2023 Google Inc. All rights reserved.
  3. // http://ceres-solver.org/
  4. //
  5. // Redistribution and use in source and binary forms, with or without
  6. // modification, are permitted provided that the following conditions are met:
  7. //
  8. // * Redistributions of source code must retain the above copyright notice,
  9. // this list of conditions and the following disclaimer.
  10. // * Redistributions in binary form must reproduce the above copyright notice,
  11. // this list of conditions and the following disclaimer in the documentation
  12. // and/or other materials provided with the distribution.
  13. // * Neither the name of Google Inc. nor the names of its contributors may be
  14. // used to endorse or promote products derived from this software without
  15. // specific prior written permission.
  16. //
  17. // THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS"
  18. // AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
  19. // IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
  20. // ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR CONTRIBUTORS BE
  21. // LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR
  22. // CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF
  23. // SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS
  24. // INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN
  25. // CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE)
  26. // ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
  27. // POSSIBILITY OF SUCH DAMAGE.
  28. //
  29. // Author: vitus@google.com (Michael Vitus)
  30. #ifndef CERES_INTERNAL_CONCURRENT_QUEUE_H_
  31. #define CERES_INTERNAL_CONCURRENT_QUEUE_H_
  32. #include <condition_variable>
  33. #include <mutex>
  34. #include <queue>
  35. #include <thread>
  36. #include "glog/logging.h"
  37. namespace ceres::internal {
  38. // A thread-safe multi-producer, multi-consumer queue for queueing items that
  39. // are typically handled asynchronously by multiple threads. The ConcurrentQueue
  40. // has two states which only affect the Wait call:
  41. //
  42. // (1) Waiters have been enabled (enabled by default or calling
  43. // EnableWaiters). The call to Wait will block until an item is available.
  44. // Push and pop will operate as expected.
  45. //
  46. // (2) StopWaiters has been called. All threads blocked in a Wait() call will
  47. // be woken up and pop any available items from the queue. All future Wait
  48. // requests will either return an element from the queue or return
  49. // immediately if no element is present. Push and pop will operate as
  50. // expected.
  51. //
  52. // A common use case is using the concurrent queue as an interface for
  53. // scheduling tasks for a set of thread workers:
  54. //
  55. // ConcurrentQueue<Task> task_queue;
  56. //
  57. // [Worker threads]:
  58. // Task task;
  59. // while(task_queue.Wait(&task)) {
  60. // ...
  61. // }
  62. //
  63. // [Producers]:
  64. // task_queue.Push(...);
  65. // ..
  66. // task_queue.Push(...);
  67. // ...
  68. // // Signal worker threads to stop blocking on Wait and terminate.
  69. // task_queue.StopWaiters();
  70. //
  71. template <typename T>
  72. class ConcurrentQueue {
  73. public:
  74. // Defaults the queue to blocking on Wait calls.
  75. ConcurrentQueue() = default;
  76. // Atomically push an element onto the queue. If a thread was waiting for an
  77. // element, wake it up.
  78. void Push(const T& value) {
  79. std::lock_guard<std::mutex> lock(mutex_);
  80. queue_.push(value);
  81. work_pending_condition_.notify_one();
  82. }
  83. // Atomically pop an element from the queue. If an element is present, return
  84. // true. If the queue was empty, return false.
  85. bool Pop(T* value) {
  86. CHECK(value != nullptr);
  87. std::lock_guard<std::mutex> lock(mutex_);
  88. return PopUnlocked(value);
  89. }
  90. // Atomically pop an element from the queue. Blocks until one is available or
  91. // StopWaiters is called. Returns true if an element was successfully popped
  92. // from the queue, otherwise returns false.
  93. bool Wait(T* value) {
  94. CHECK(value != nullptr);
  95. std::unique_lock<std::mutex> lock(mutex_);
  96. work_pending_condition_.wait(lock,
  97. [&]() { return !(wait_ && queue_.empty()); });
  98. return PopUnlocked(value);
  99. }
  100. // Unblock all threads waiting to pop a value from the queue, and they will
  101. // exit Wait() without getting a value. All future Wait requests will return
  102. // immediately if no element is present until EnableWaiters is called.
  103. void StopWaiters() {
  104. std::lock_guard<std::mutex> lock(mutex_);
  105. wait_ = false;
  106. work_pending_condition_.notify_all();
  107. }
  108. // Enable threads to block on Wait calls.
  109. void EnableWaiters() {
  110. std::lock_guard<std::mutex> lock(mutex_);
  111. wait_ = true;
  112. }
  113. private:
  114. // Pops an element from the queue. If an element is present, return
  115. // true. If the queue was empty, return false. Not thread-safe. Must acquire
  116. // the lock before calling.
  117. bool PopUnlocked(T* value) {
  118. if (queue_.empty()) {
  119. return false;
  120. }
  121. *value = queue_.front();
  122. queue_.pop();
  123. return true;
  124. }
  125. // The mutex controls read and write access to the queue_ and stop_
  126. // variables. It is also used to block the calling thread until an element is
  127. // available to pop from the queue.
  128. std::mutex mutex_;
  129. std::condition_variable work_pending_condition_;
  130. std::queue<T> queue_;
  131. // If true, signals that callers of Wait will block waiting to pop an
  132. // element off the queue.
  133. bool wait_{true};
  134. };
  135. } // namespace ceres::internal
  136. #endif // CERES_INTERNAL_CONCURRENT_QUEUE_H_