operations_chain.h 8.0 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197
  1. /*
  2. * Copyright 2019 The WebRTC Project Authors. All rights reserved.
  3. *
  4. * Use of this source code is governed by a BSD-style license
  5. * that can be found in the LICENSE file in the root of the source
  6. * tree. An additional intellectual property rights grant can be found
  7. * in the file PATENTS. All contributing project authors may
  8. * be found in the AUTHORS file in the root of the source tree.
  9. */
  10. #ifndef RTC_BASE_OPERATIONS_CHAIN_H_
  11. #define RTC_BASE_OPERATIONS_CHAIN_H_
  12. #include <functional>
  13. #include <memory>
  14. #include <queue>
  15. #include <set>
  16. #include <type_traits>
  17. #include <utility>
  18. #include "absl/types/optional.h"
  19. #include "api/scoped_refptr.h"
  20. #include "rtc_base/checks.h"
  21. #include "rtc_base/constructor_magic.h"
  22. #include "rtc_base/ref_count.h"
  23. #include "rtc_base/ref_counted_object.h"
  24. #include "rtc_base/synchronization/sequence_checker.h"
  25. #include "rtc_base/system/no_unique_address.h"
  26. namespace rtc {
  27. namespace rtc_operations_chain_internal {
  28. // Abstract base class for operations on the OperationsChain. Run() must be
  29. // invoked exactly once during the Operation's lifespan.
  30. class Operation {
  31. public:
  32. virtual ~Operation() {}
  33. virtual void Run() = 0;
  34. };
  35. // FunctorT is the same as in OperationsChain::ChainOperation(). |callback_| is
  36. // passed on to the |functor_| and is used to inform the OperationsChain that
  37. // the operation completed. The functor is responsible for invoking the
  38. // callback when the operation has completed.
  39. template <typename FunctorT>
  40. class OperationWithFunctor final : public Operation {
  41. public:
  42. OperationWithFunctor(FunctorT&& functor, std::function<void()> callback)
  43. : functor_(std::forward<FunctorT>(functor)),
  44. callback_(std::move(callback)) {}
  45. ~OperationWithFunctor() override { RTC_DCHECK(has_run_); }
  46. void Run() override {
  47. RTC_DCHECK(!has_run_);
  48. #ifdef RTC_DCHECK_IS_ON
  49. has_run_ = true;
  50. #endif // RTC_DCHECK_IS_ON
  51. // The functor being executed may invoke the callback synchronously,
  52. // marking the operation as complete. As such, |this| OperationWithFunctor
  53. // object may get deleted here, including destroying |functor_|. To
  54. // protect the functor from self-destruction while running, it is moved to
  55. // a local variable.
  56. auto functor = std::move(functor_);
  57. functor(std::move(callback_));
  58. // |this| may now be deleted; don't touch any member variables.
  59. }
  60. private:
  61. typename std::remove_reference<FunctorT>::type functor_;
  62. std::function<void()> callback_;
  63. #ifdef RTC_DCHECK_IS_ON
  64. bool has_run_ = false;
  65. #endif // RTC_DCHECK_IS_ON
  66. };
  67. } // namespace rtc_operations_chain_internal
  68. // An implementation of an operations chain. An operations chain is used to
  69. // ensure that asynchronous tasks are executed in-order with at most one task
  70. // running at a time. The notion of an operation chain is defined in
  71. // https://w3c.github.io/webrtc-pc/#dfn-operations-chain, though unlike this
  72. // implementation, the referenced definition is coupled with a peer connection.
  73. //
  74. // An operation is an asynchronous task. The operation starts when its functor
  75. // is invoked, and completes when the callback that is passed to functor is
  76. // invoked by the operation. The operation must start and complete on the same
  77. // sequence that the operation was "chained" on. As such, the OperationsChain
  78. // operates in a "single-threaded" fashion, but the asynchronous operations may
  79. // use any number of threads to achieve "in parallel" behavior.
  80. //
  81. // When an operation is chained onto the OperationsChain, it is enqueued to be
  82. // executed. Operations are executed in FIFO order, where the next operation
  83. // does not start until the previous operation has completed. OperationsChain
  84. // guarantees that:
  85. // - If the operations chain is empty when an operation is chained, the
  86. // operation starts immediately, inside ChainOperation().
  87. // - If the operations chain is not empty when an operation is chained, the
  88. // operation starts upon the previous operation completing, inside the
  89. // callback.
  90. //
  91. // An operation is contractually obligated to invoke the completion callback
  92. // exactly once. Cancelling a chained operation is not supported by the
  93. // OperationsChain; an operation that wants to be cancellable is responsible for
  94. // aborting its own steps. The callback must still be invoked.
  95. //
  96. // The OperationsChain is kept-alive through reference counting if there are
  97. // operations pending. This, together with the contract, guarantees that all
  98. // operations that are chained get executed.
  99. class OperationsChain final : public RefCountedObject<RefCountInterface> {
  100. public:
  101. static scoped_refptr<OperationsChain> Create();
  102. ~OperationsChain();
  103. void SetOnChainEmptyCallback(std::function<void()> on_chain_empty_callback);
  104. bool IsEmpty() const;
  105. // Chains an operation. Chained operations are executed in FIFO order. The
  106. // operation starts when |functor| is executed by the OperationsChain and is
  107. // contractually obligated to invoke the callback passed to it when the
  108. // operation is complete. Operations must start and complete on the same
  109. // sequence that this method was invoked on.
  110. //
  111. // If the OperationsChain is empty, the operation starts immediately.
  112. // Otherwise it starts upon the previous operation completing.
  113. //
  114. // Requirements of FunctorT:
  115. // - FunctorT is movable.
  116. // - FunctorT implements "T operator()(std::function<void()> callback)" or
  117. // "T operator()(std::function<void()> callback) const" for some T (if T is
  118. // not void, the return value is discarded in the invoking sequence). The
  119. // operator starts the operation; when the operation is complete, "callback"
  120. // MUST be invoked, and it MUST be so on the sequence that ChainOperation()
  121. // was invoked on.
  122. //
  123. // Lambda expressions are valid functors.
  124. template <typename FunctorT>
  125. void ChainOperation(FunctorT&& functor) {
  126. RTC_DCHECK_RUN_ON(&sequence_checker_);
  127. chained_operations_.push(
  128. std::make_unique<
  129. rtc_operations_chain_internal::OperationWithFunctor<FunctorT>>(
  130. std::forward<FunctorT>(functor), CreateOperationsChainCallback()));
  131. // If this is the only operation in the chain we execute it immediately.
  132. // Otherwise the callback will get invoked when the pending operation
  133. // completes which will trigger the next operation to execute.
  134. if (chained_operations_.size() == 1) {
  135. chained_operations_.front()->Run();
  136. }
  137. }
  138. private:
  139. friend class CallbackHandle;
  140. // The callback that is passed to an operation's functor (that is used to
  141. // inform the OperationsChain that the operation has completed) is of type
  142. // std::function<void()>, which is a copyable type. To allow the callback to
  143. // be copyable, it is backed up by this reference counted handle. See
  144. // CreateOperationsChainCallback().
  145. class CallbackHandle final : public RefCountedObject<RefCountInterface> {
  146. public:
  147. explicit CallbackHandle(scoped_refptr<OperationsChain> operations_chain);
  148. ~CallbackHandle();
  149. void OnOperationComplete();
  150. private:
  151. scoped_refptr<OperationsChain> operations_chain_;
  152. #ifdef RTC_DCHECK_IS_ON
  153. bool has_run_ = false;
  154. #endif // RTC_DCHECK_IS_ON
  155. RTC_DISALLOW_COPY_AND_ASSIGN(CallbackHandle);
  156. };
  157. OperationsChain();
  158. std::function<void()> CreateOperationsChainCallback();
  159. void OnOperationComplete();
  160. RTC_NO_UNIQUE_ADDRESS webrtc::SequenceChecker sequence_checker_;
  161. // FIFO-list of operations that are chained. An operation that is executing
  162. // remains on this list until it has completed by invoking the callback passed
  163. // to it.
  164. std::queue<std::unique_ptr<rtc_operations_chain_internal::Operation>>
  165. chained_operations_ RTC_GUARDED_BY(sequence_checker_);
  166. absl::optional<std::function<void()>> on_chain_empty_callback_
  167. RTC_GUARDED_BY(sequence_checker_);
  168. RTC_DISALLOW_COPY_AND_ASSIGN(OperationsChain);
  169. };
  170. } // namespace rtc
  171. #endif // RTC_BASE_OPERATIONS_CHAIN_H_