async_invoker.h 10 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265
  1. /*
  2. * Copyright 2014 The WebRTC Project Authors. All rights reserved.
  3. *
  4. * Use of this source code is governed by a BSD-style license
  5. * that can be found in the LICENSE file in the root of the source
  6. * tree. An additional intellectual property rights grant can be found
  7. * in the file PATENTS. All contributing project authors may
  8. * be found in the AUTHORS file in the root of the source tree.
  9. */
  10. #ifndef RTC_BASE_ASYNC_INVOKER_H_
  11. #define RTC_BASE_ASYNC_INVOKER_H_
  12. #include <atomic>
  13. #include <memory>
  14. #include <utility>
  15. #include "api/scoped_refptr.h"
  16. #include "rtc_base/async_invoker_inl.h"
  17. #include "rtc_base/bind.h"
  18. #include "rtc_base/constructor_magic.h"
  19. #include "rtc_base/event.h"
  20. #include "rtc_base/ref_counted_object.h"
  21. #include "rtc_base/third_party/sigslot/sigslot.h"
  22. #include "rtc_base/thread.h"
  23. namespace rtc {
  24. // Invokes function objects (aka functors) asynchronously on a Thread, and
  25. // owns the lifetime of calls (ie, when this object is destroyed, calls in
  26. // flight are cancelled). AsyncInvoker can optionally execute a user-specified
  27. // function when the asynchronous call is complete, or operates in
  28. // fire-and-forget mode otherwise.
  29. //
  30. // AsyncInvoker does not own the thread it calls functors on.
  31. //
  32. // A note about async calls and object lifetimes: users should
  33. // be mindful of object lifetimes when calling functions asynchronously and
  34. // ensure objects used by the function _cannot_ be deleted between the
  35. // invocation and execution of the functor. AsyncInvoker is designed to
  36. // help: any calls in flight will be cancelled when the AsyncInvoker used to
  37. // make the call is destructed, and any calls executing will be allowed to
  38. // complete before AsyncInvoker destructs.
  39. //
  40. // The easiest way to ensure lifetimes are handled correctly is to create a
  41. // class that owns the Thread and AsyncInvoker objects, and then call its
  42. // methods asynchronously as needed.
  43. //
  44. // Example:
  45. // class MyClass {
  46. // public:
  47. // void FireAsyncTaskWithResult(Thread* thread, int x) {
  48. // // Specify a callback to get the result upon completion.
  49. // invoker_.AsyncInvoke<int>(RTC_FROM_HERE,
  50. // thread, Bind(&MyClass::AsyncTaskWithResult, this, x),
  51. // &MyClass::OnTaskComplete, this);
  52. // }
  53. // void FireAnotherAsyncTask(Thread* thread) {
  54. // // No callback specified means fire-and-forget.
  55. // invoker_.AsyncInvoke<void>(RTC_FROM_HERE,
  56. // thread, Bind(&MyClass::AnotherAsyncTask, this));
  57. //
  58. // private:
  59. // int AsyncTaskWithResult(int x) {
  60. // // Some long running process...
  61. // return x * x;
  62. // }
  63. // void AnotherAsyncTask() {
  64. // // Some other long running process...
  65. // }
  66. // void OnTaskComplete(int result) { result_ = result; }
  67. //
  68. // AsyncInvoker invoker_;
  69. // int result_;
  70. // };
  71. //
  72. // More details about threading:
  73. // - It's safe to construct/destruct AsyncInvoker on different threads.
  74. // - It's safe to call AsyncInvoke from different threads.
  75. // - It's safe to call AsyncInvoke recursively from *within* a functor that's
  76. // being AsyncInvoked.
  77. // - However, it's *not* safe to call AsyncInvoke from *outside* a functor
  78. // that's being AsyncInvoked while the AsyncInvoker is being destroyed on
  79. // another thread. This is just inherently unsafe and there's no way to
  80. // prevent that. So, the user of this class should ensure that the start of
  81. // each "chain" of invocations is synchronized somehow with the AsyncInvoker's
  82. // destruction. This can be done by starting each chain of invocations on the
  83. // same thread on which it will be destroyed, or by using some other
  84. // synchronization method.
  85. class AsyncInvoker : public MessageHandler {
  86. public:
  87. AsyncInvoker();
  88. ~AsyncInvoker() override;
  89. // Call |functor| asynchronously on |thread|, with no callback upon
  90. // completion. Returns immediately.
  91. template <class ReturnT, class FunctorT>
  92. void AsyncInvoke(const Location& posted_from,
  93. Thread* thread,
  94. FunctorT&& functor,
  95. uint32_t id = 0) {
  96. std::unique_ptr<AsyncClosure> closure(
  97. new FireAndForgetAsyncClosure<FunctorT>(
  98. this, std::forward<FunctorT>(functor)));
  99. DoInvoke(posted_from, thread, std::move(closure), id);
  100. }
  101. // Call |functor| asynchronously on |thread| with |delay_ms|, with no callback
  102. // upon completion. Returns immediately.
  103. template <class ReturnT, class FunctorT>
  104. void AsyncInvokeDelayed(const Location& posted_from,
  105. Thread* thread,
  106. FunctorT&& functor,
  107. uint32_t delay_ms,
  108. uint32_t id = 0) {
  109. std::unique_ptr<AsyncClosure> closure(
  110. new FireAndForgetAsyncClosure<FunctorT>(
  111. this, std::forward<FunctorT>(functor)));
  112. DoInvokeDelayed(posted_from, thread, std::move(closure), delay_ms, id);
  113. }
  114. // Synchronously execute on |thread| all outstanding calls we own
  115. // that are pending on |thread|, and wait for calls to complete
  116. // before returning. Optionally filter by message id.
  117. // The destructor will not wait for outstanding calls, so if that
  118. // behavior is desired, call Flush() before destroying this object.
  119. void Flush(Thread* thread, uint32_t id = MQID_ANY);
  120. // Cancels any outstanding calls we own that are pending on any thread, and
  121. // which have not yet started to execute. This does not wait for any calls
  122. // that have already started executing to complete.
  123. void Clear();
  124. private:
  125. void OnMessage(Message* msg) override;
  126. void DoInvoke(const Location& posted_from,
  127. Thread* thread,
  128. std::unique_ptr<AsyncClosure> closure,
  129. uint32_t id);
  130. void DoInvokeDelayed(const Location& posted_from,
  131. Thread* thread,
  132. std::unique_ptr<AsyncClosure> closure,
  133. uint32_t delay_ms,
  134. uint32_t id);
  135. // Used to keep track of how many invocations (AsyncClosures) are still
  136. // alive, so that the destructor can wait for them to finish, as described in
  137. // the class documentation.
  138. //
  139. // TODO(deadbeef): Using a raw std::atomic like this is prone to error and
  140. // difficult to maintain. We should try to wrap this functionality in a
  141. // separate class to reduce the chance of errors being introduced in the
  142. // future.
  143. std::atomic<int> pending_invocations_;
  144. // Reference counted so that if the AsyncInvoker destructor finishes before
  145. // an AsyncClosure's destructor that's about to call
  146. // "invocation_complete_->Set()", it's not dereferenced after being
  147. // destroyed.
  148. scoped_refptr<RefCountedObject<Event>> invocation_complete_;
  149. // This flag is used to ensure that if an application AsyncInvokes tasks that
  150. // recursively AsyncInvoke other tasks ad infinitum, the cycle eventually
  151. // terminates.
  152. std::atomic<bool> destroying_;
  153. friend class AsyncClosure;
  154. RTC_DISALLOW_COPY_AND_ASSIGN(AsyncInvoker);
  155. };
  156. // Similar to AsyncInvoker, but guards against the Thread being destroyed while
  157. // there are outstanding dangling pointers to it. It will connect to the current
  158. // thread in the constructor, and will get notified when that thread is
  159. // destroyed. After GuardedAsyncInvoker is constructed, it can be used from
  160. // other threads to post functors to the thread it was constructed on. If that
  161. // thread dies, any further calls to AsyncInvoke() will be safely ignored.
  162. class GuardedAsyncInvoker : public sigslot::has_slots<> {
  163. public:
  164. GuardedAsyncInvoker();
  165. ~GuardedAsyncInvoker() override;
  166. // Synchronously execute all outstanding calls we own, and wait for calls to
  167. // complete before returning. Optionally filter by message id. The destructor
  168. // will not wait for outstanding calls, so if that behavior is desired, call
  169. // Flush() first. Returns false if the thread has died.
  170. bool Flush(uint32_t id = MQID_ANY);
  171. // Call |functor| asynchronously with no callback upon completion. Returns
  172. // immediately. Returns false if the thread has died.
  173. template <class ReturnT, class FunctorT>
  174. bool AsyncInvoke(const Location& posted_from,
  175. FunctorT&& functor,
  176. uint32_t id = 0) {
  177. CritScope cs(&crit_);
  178. if (thread_ == nullptr)
  179. return false;
  180. invoker_.AsyncInvoke<ReturnT, FunctorT>(
  181. posted_from, thread_, std::forward<FunctorT>(functor), id);
  182. return true;
  183. }
  184. // Call |functor| asynchronously with |delay_ms|, with no callback upon
  185. // completion. Returns immediately. Returns false if the thread has died.
  186. template <class ReturnT, class FunctorT>
  187. bool AsyncInvokeDelayed(const Location& posted_from,
  188. FunctorT&& functor,
  189. uint32_t delay_ms,
  190. uint32_t id = 0) {
  191. CritScope cs(&crit_);
  192. if (thread_ == nullptr)
  193. return false;
  194. invoker_.AsyncInvokeDelayed<ReturnT, FunctorT>(
  195. posted_from, thread_, std::forward<FunctorT>(functor), delay_ms, id);
  196. return true;
  197. }
  198. // Call |functor| asynchronously, calling |callback| when done. Returns false
  199. // if the thread has died.
  200. template <class ReturnT, class FunctorT, class HostT>
  201. bool AsyncInvoke(const Location& posted_from,
  202. const Location& callback_posted_from,
  203. FunctorT&& functor,
  204. void (HostT::*callback)(ReturnT),
  205. HostT* callback_host,
  206. uint32_t id = 0) {
  207. CritScope cs(&crit_);
  208. if (thread_ == nullptr)
  209. return false;
  210. invoker_.AsyncInvoke<ReturnT, FunctorT, HostT>(
  211. posted_from, callback_posted_from, thread_,
  212. std::forward<FunctorT>(functor), callback, callback_host, id);
  213. return true;
  214. }
  215. // Call |functor| asynchronously calling |callback| when done. Overloaded for
  216. // void return. Returns false if the thread has died.
  217. template <class ReturnT, class FunctorT, class HostT>
  218. bool AsyncInvoke(const Location& posted_from,
  219. const Location& callback_posted_from,
  220. FunctorT&& functor,
  221. void (HostT::*callback)(),
  222. HostT* callback_host,
  223. uint32_t id = 0) {
  224. CritScope cs(&crit_);
  225. if (thread_ == nullptr)
  226. return false;
  227. invoker_.AsyncInvoke<ReturnT, FunctorT, HostT>(
  228. posted_from, callback_posted_from, thread_,
  229. std::forward<FunctorT>(functor), callback, callback_host, id);
  230. return true;
  231. }
  232. private:
  233. // Callback when |thread_| is destroyed.
  234. void ThreadDestroyed();
  235. CriticalSection crit_;
  236. Thread* thread_ RTC_GUARDED_BY(crit_);
  237. AsyncInvoker invoker_ RTC_GUARDED_BY(crit_);
  238. };
  239. } // namespace rtc
  240. #endif // RTC_BASE_ASYNC_INVOKER_H_