observer_list_threadsafe.h 10 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270
  1. // Copyright (c) 2012 The Chromium Authors. All rights reserved.
  2. // Use of this source code is governed by a BSD-style license that can be
  3. // found in the LICENSE file.
  4. #ifndef BASE_OBSERVER_LIST_THREADSAFE_H_
  5. #define BASE_OBSERVER_LIST_THREADSAFE_H_
  6. #include <unordered_map>
  7. #include <utility>
  8. #include <vector>
  9. #include "base/base_export.h"
  10. #include "base/bind.h"
  11. #include "base/check_op.h"
  12. #include "base/lazy_instance.h"
  13. #include "base/location.h"
  14. #include "base/memory/ref_counted.h"
  15. #include "base/observer_list.h"
  16. #include "base/sequenced_task_runner.h"
  17. #include "base/stl_util.h"
  18. #include "base/synchronization/lock.h"
  19. #include "base/threading/sequenced_task_runner_handle.h"
  20. #include "base/threading/thread_local.h"
  21. #include "build/build_config.h"
  22. ///////////////////////////////////////////////////////////////////////////////
  23. //
  24. // OVERVIEW:
  25. //
  26. // A thread-safe container for a list of observers. This is similar to the
  27. // observer_list (see observer_list.h), but it is more robust for multi-
  28. // threaded situations.
  29. //
  30. // The following use cases are supported:
  31. // * Observers can register for notifications from any sequence. They are
  32. // always notified on the sequence from which they were registered.
  33. // * Any sequence may trigger a notification via Notify().
  34. // * Observers can remove themselves from the observer list inside of a
  35. // callback.
  36. // * If one sequence is notifying observers concurrently with an observer
  37. // removing itself from the observer list, the notifications will be
  38. // silently dropped.
  39. //
  40. // The drawback of the threadsafe observer list is that notifications are not
  41. // as real-time as the non-threadsafe version of this class. Notifications
  42. // will always be done via PostTask() to another sequence, whereas with the
  43. // non-thread-safe observer_list, notifications happen synchronously.
  44. //
  45. ///////////////////////////////////////////////////////////////////////////////
  46. namespace base {
  47. namespace internal {
  48. class BASE_EXPORT ObserverListThreadSafeBase
  49. : public RefCountedThreadSafe<ObserverListThreadSafeBase> {
  50. public:
  51. ObserverListThreadSafeBase() = default;
  52. ObserverListThreadSafeBase(const ObserverListThreadSafeBase&) = delete;
  53. ObserverListThreadSafeBase& operator=(const ObserverListThreadSafeBase&) =
  54. delete;
  55. protected:
  56. template <typename ObserverType, typename Method>
  57. struct Dispatcher;
  58. template <typename ObserverType, typename ReceiverType, typename... Params>
  59. struct Dispatcher<ObserverType, void (ReceiverType::*)(Params...)> {
  60. static void Run(void (ReceiverType::*m)(Params...),
  61. Params... params,
  62. ObserverType* obj) {
  63. (obj->*m)(std::forward<Params>(params)...);
  64. }
  65. };
  66. struct NotificationDataBase {
  67. NotificationDataBase(void* observer_list_in, const Location& from_here_in)
  68. : observer_list(observer_list_in), from_here(from_here_in) {}
  69. void* observer_list;
  70. Location from_here;
  71. };
  72. virtual ~ObserverListThreadSafeBase() = default;
  73. static LazyInstance<ThreadLocalPointer<const NotificationDataBase>>::Leaky
  74. tls_current_notification_;
  75. private:
  76. friend class RefCountedThreadSafe<ObserverListThreadSafeBase>;
  77. };
  78. } // namespace internal
  79. template <class ObserverType>
  80. class ObserverListThreadSafe : public internal::ObserverListThreadSafeBase {
  81. public:
  82. ObserverListThreadSafe() = default;
  83. explicit ObserverListThreadSafe(ObserverListPolicy policy)
  84. : policy_(policy) {}
  85. ObserverListThreadSafe(const ObserverListThreadSafe&) = delete;
  86. ObserverListThreadSafe& operator=(const ObserverListThreadSafe&) = delete;
  87. // Adds |observer| to the list. |observer| must not already be in the list.
  88. void AddObserver(ObserverType* observer) {
  89. DCHECK(SequencedTaskRunnerHandle::IsSet())
  90. << "An observer can only be registered when SequencedTaskRunnerHandle "
  91. "is set. If this is in a unit test, you're likely merely missing a "
  92. "base::test::(SingleThread)TaskEnvironment in your fixture. "
  93. "Otherwise, try running this code on a named thread (main/UI/IO) or "
  94. "from a task posted to a base::SequencedTaskRunner or "
  95. "base::SingleThreadTaskRunner.";
  96. AutoLock auto_lock(lock_);
  97. // Add |observer| to the list of observers.
  98. DCHECK(!Contains(observers_, observer));
  99. const scoped_refptr<SequencedTaskRunner> task_runner =
  100. SequencedTaskRunnerHandle::Get();
  101. observers_[observer] = task_runner;
  102. // If this is called while a notification is being dispatched on this thread
  103. // and |policy_| is ALL, |observer| must be notified (if a notification is
  104. // being dispatched on another thread in parallel, the notification may or
  105. // may not make it to |observer| depending on the outcome of the race to
  106. // |lock_|).
  107. if (policy_ == ObserverListPolicy::ALL) {
  108. const NotificationDataBase* current_notification =
  109. tls_current_notification_.Get().Get();
  110. if (current_notification && current_notification->observer_list == this) {
  111. task_runner->PostTask(
  112. current_notification->from_here,
  113. BindOnce(
  114. &ObserverListThreadSafe<ObserverType>::NotifyWrapper, this,
  115. observer,
  116. *static_cast<const NotificationData*>(current_notification)));
  117. }
  118. }
  119. }
  120. // Remove an observer from the list if it is in the list.
  121. //
  122. // If a notification was sent to the observer but hasn't started to run yet,
  123. // it will be aborted. If a notification has started to run, removing the
  124. // observer won't stop it.
  125. void RemoveObserver(ObserverType* observer) {
  126. AutoLock auto_lock(lock_);
  127. observers_.erase(observer);
  128. }
  129. // Verifies that the list is currently empty (i.e. there are no observers).
  130. void AssertEmpty() const {
  131. #if DCHECK_IS_ON()
  132. AutoLock auto_lock(lock_);
  133. DCHECK(observers_.empty());
  134. #endif
  135. }
  136. // Asynchronously invokes a callback on all observers, on their registration
  137. // sequence. You cannot assume that at the completion of the Notify call that
  138. // all Observers have been Notified. The notification may still be pending
  139. // delivery.
  140. template <typename Method, typename... Params>
  141. void Notify(const Location& from_here, Method m, Params&&... params) {
  142. RepeatingCallback<void(ObserverType*)> method =
  143. BindRepeating(&Dispatcher<ObserverType, Method>::Run, m,
  144. std::forward<Params>(params)...);
  145. AutoLock lock(lock_);
  146. for (const auto& observer : observers_) {
  147. observer.second->PostTask(
  148. from_here,
  149. BindOnce(&ObserverListThreadSafe<ObserverType>::NotifyWrapper, this,
  150. observer.first, NotificationData(this, from_here, method)));
  151. }
  152. }
  153. // Like Notify() but attempts to synchronously invoke callbacks if they are
  154. // associated with this thread.
  155. template <typename Method, typename... Params>
  156. void NotifySynchronously(const Location& from_here,
  157. Method m,
  158. Params&&... params) {
  159. RepeatingCallback<void(ObserverType*)> method =
  160. BindRepeating(&Dispatcher<ObserverType, Method>::Run, m,
  161. std::forward<Params>(params)...);
  162. // The observers may make reentrant calls (which can be a problem due to the
  163. // lock), so we extract a list to call synchronously.
  164. std::vector<ObserverType*> current_sequence_observers;
  165. {
  166. AutoLock lock(lock_);
  167. current_sequence_observers.reserve(observers_.size());
  168. for (const auto& observer : observers_) {
  169. if (observer.second->RunsTasksInCurrentSequence()) {
  170. current_sequence_observers.push_back(observer.first);
  171. } else {
  172. observer.second->PostTask(
  173. from_here,
  174. BindOnce(&ObserverListThreadSafe<ObserverType>::NotifyWrapper,
  175. this, observer.first,
  176. NotificationData(this, from_here, method)));
  177. }
  178. }
  179. }
  180. for (ObserverType* observer : current_sequence_observers) {
  181. NotifyWrapper(observer, NotificationData(this, from_here, method));
  182. }
  183. }
  184. private:
  185. friend class RefCountedThreadSafe<ObserverListThreadSafeBase>;
  186. struct NotificationData : public NotificationDataBase {
  187. NotificationData(ObserverListThreadSafe* observer_list_in,
  188. const Location& from_here_in,
  189. const RepeatingCallback<void(ObserverType*)>& method_in)
  190. : NotificationDataBase(observer_list_in, from_here_in),
  191. method(method_in) {}
  192. RepeatingCallback<void(ObserverType*)> method;
  193. };
  194. ~ObserverListThreadSafe() override = default;
  195. void NotifyWrapper(ObserverType* observer,
  196. const NotificationData& notification) {
  197. {
  198. AutoLock auto_lock(lock_);
  199. // Check whether the observer still needs a notification.
  200. auto it = observers_.find(observer);
  201. if (it == observers_.end())
  202. return;
  203. DCHECK(it->second->RunsTasksInCurrentSequence());
  204. }
  205. // Keep track of the notification being dispatched on the current thread.
  206. // This will be used if the callback below calls AddObserver().
  207. //
  208. // Note: |tls_current_notification_| may not be nullptr if this runs in a
  209. // nested loop started by a notification callback. In that case, it is
  210. // important to save the previous value to restore it later.
  211. auto& tls_current_notification = tls_current_notification_.Get();
  212. const NotificationDataBase* const previous_notification =
  213. tls_current_notification.Get();
  214. tls_current_notification.Set(&notification);
  215. // Invoke the callback.
  216. notification.method.Run(observer);
  217. // Reset the notification being dispatched on the current thread to its
  218. // previous value.
  219. tls_current_notification.Set(previous_notification);
  220. }
  221. const ObserverListPolicy policy_ = ObserverListPolicy::ALL;
  222. mutable Lock lock_;
  223. // Keys are observers. Values are the SequencedTaskRunners on which they must
  224. // be notified.
  225. std::unordered_map<ObserverType*, scoped_refptr<SequencedTaskRunner>>
  226. observers_ GUARDED_BY(lock_);
  227. };
  228. } // namespace base
  229. #endif // BASE_OBSERVER_LIST_THREADSAFE_H_