123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270 |
- // Copyright (c) 2012 The Chromium Authors. All rights reserved.
- // Use of this source code is governed by a BSD-style license that can be
- // found in the LICENSE file.
- #ifndef BASE_OBSERVER_LIST_THREADSAFE_H_
- #define BASE_OBSERVER_LIST_THREADSAFE_H_
- #include <unordered_map>
- #include <utility>
- #include <vector>
- #include "base/base_export.h"
- #include "base/bind.h"
- #include "base/check_op.h"
- #include "base/lazy_instance.h"
- #include "base/location.h"
- #include "base/memory/ref_counted.h"
- #include "base/observer_list.h"
- #include "base/sequenced_task_runner.h"
- #include "base/stl_util.h"
- #include "base/synchronization/lock.h"
- #include "base/threading/sequenced_task_runner_handle.h"
- #include "base/threading/thread_local.h"
- #include "build/build_config.h"
- ///////////////////////////////////////////////////////////////////////////////
- //
- // OVERVIEW:
- //
- // A thread-safe container for a list of observers. This is similar to the
- // observer_list (see observer_list.h), but it is more robust for multi-
- // threaded situations.
- //
- // The following use cases are supported:
- // * Observers can register for notifications from any sequence. They are
- // always notified on the sequence from which they were registered.
- // * Any sequence may trigger a notification via Notify().
- // * Observers can remove themselves from the observer list inside of a
- // callback.
- // * If one sequence is notifying observers concurrently with an observer
- // removing itself from the observer list, the notifications will be
- // silently dropped.
- //
- // The drawback of the threadsafe observer list is that notifications are not
- // as real-time as the non-threadsafe version of this class. Notifications
- // will always be done via PostTask() to another sequence, whereas with the
- // non-thread-safe observer_list, notifications happen synchronously.
- //
- ///////////////////////////////////////////////////////////////////////////////
- namespace base {
- namespace internal {
- class BASE_EXPORT ObserverListThreadSafeBase
- : public RefCountedThreadSafe<ObserverListThreadSafeBase> {
- public:
- ObserverListThreadSafeBase() = default;
- ObserverListThreadSafeBase(const ObserverListThreadSafeBase&) = delete;
- ObserverListThreadSafeBase& operator=(const ObserverListThreadSafeBase&) =
- delete;
- protected:
- template <typename ObserverType, typename Method>
- struct Dispatcher;
- template <typename ObserverType, typename ReceiverType, typename... Params>
- struct Dispatcher<ObserverType, void (ReceiverType::*)(Params...)> {
- static void Run(void (ReceiverType::*m)(Params...),
- Params... params,
- ObserverType* obj) {
- (obj->*m)(std::forward<Params>(params)...);
- }
- };
- struct NotificationDataBase {
- NotificationDataBase(void* observer_list_in, const Location& from_here_in)
- : observer_list(observer_list_in), from_here(from_here_in) {}
- void* observer_list;
- Location from_here;
- };
- virtual ~ObserverListThreadSafeBase() = default;
- static LazyInstance<ThreadLocalPointer<const NotificationDataBase>>::Leaky
- tls_current_notification_;
- private:
- friend class RefCountedThreadSafe<ObserverListThreadSafeBase>;
- };
- } // namespace internal
- template <class ObserverType>
- class ObserverListThreadSafe : public internal::ObserverListThreadSafeBase {
- public:
- ObserverListThreadSafe() = default;
- explicit ObserverListThreadSafe(ObserverListPolicy policy)
- : policy_(policy) {}
- ObserverListThreadSafe(const ObserverListThreadSafe&) = delete;
- ObserverListThreadSafe& operator=(const ObserverListThreadSafe&) = delete;
- // Adds |observer| to the list. |observer| must not already be in the list.
- void AddObserver(ObserverType* observer) {
- DCHECK(SequencedTaskRunnerHandle::IsSet())
- << "An observer can only be registered when SequencedTaskRunnerHandle "
- "is set. If this is in a unit test, you're likely merely missing a "
- "base::test::(SingleThread)TaskEnvironment in your fixture. "
- "Otherwise, try running this code on a named thread (main/UI/IO) or "
- "from a task posted to a base::SequencedTaskRunner or "
- "base::SingleThreadTaskRunner.";
- AutoLock auto_lock(lock_);
- // Add |observer| to the list of observers.
- DCHECK(!Contains(observers_, observer));
- const scoped_refptr<SequencedTaskRunner> task_runner =
- SequencedTaskRunnerHandle::Get();
- observers_[observer] = task_runner;
- // If this is called while a notification is being dispatched on this thread
- // and |policy_| is ALL, |observer| must be notified (if a notification is
- // being dispatched on another thread in parallel, the notification may or
- // may not make it to |observer| depending on the outcome of the race to
- // |lock_|).
- if (policy_ == ObserverListPolicy::ALL) {
- const NotificationDataBase* current_notification =
- tls_current_notification_.Get().Get();
- if (current_notification && current_notification->observer_list == this) {
- task_runner->PostTask(
- current_notification->from_here,
- BindOnce(
- &ObserverListThreadSafe<ObserverType>::NotifyWrapper, this,
- observer,
- *static_cast<const NotificationData*>(current_notification)));
- }
- }
- }
- // Remove an observer from the list if it is in the list.
- //
- // If a notification was sent to the observer but hasn't started to run yet,
- // it will be aborted. If a notification has started to run, removing the
- // observer won't stop it.
- void RemoveObserver(ObserverType* observer) {
- AutoLock auto_lock(lock_);
- observers_.erase(observer);
- }
- // Verifies that the list is currently empty (i.e. there are no observers).
- void AssertEmpty() const {
- #if DCHECK_IS_ON()
- AutoLock auto_lock(lock_);
- DCHECK(observers_.empty());
- #endif
- }
- // Asynchronously invokes a callback on all observers, on their registration
- // sequence. You cannot assume that at the completion of the Notify call that
- // all Observers have been Notified. The notification may still be pending
- // delivery.
- template <typename Method, typename... Params>
- void Notify(const Location& from_here, Method m, Params&&... params) {
- RepeatingCallback<void(ObserverType*)> method =
- BindRepeating(&Dispatcher<ObserverType, Method>::Run, m,
- std::forward<Params>(params)...);
- AutoLock lock(lock_);
- for (const auto& observer : observers_) {
- observer.second->PostTask(
- from_here,
- BindOnce(&ObserverListThreadSafe<ObserverType>::NotifyWrapper, this,
- observer.first, NotificationData(this, from_here, method)));
- }
- }
- // Like Notify() but attempts to synchronously invoke callbacks if they are
- // associated with this thread.
- template <typename Method, typename... Params>
- void NotifySynchronously(const Location& from_here,
- Method m,
- Params&&... params) {
- RepeatingCallback<void(ObserverType*)> method =
- BindRepeating(&Dispatcher<ObserverType, Method>::Run, m,
- std::forward<Params>(params)...);
- // The observers may make reentrant calls (which can be a problem due to the
- // lock), so we extract a list to call synchronously.
- std::vector<ObserverType*> current_sequence_observers;
- {
- AutoLock lock(lock_);
- current_sequence_observers.reserve(observers_.size());
- for (const auto& observer : observers_) {
- if (observer.second->RunsTasksInCurrentSequence()) {
- current_sequence_observers.push_back(observer.first);
- } else {
- observer.second->PostTask(
- from_here,
- BindOnce(&ObserverListThreadSafe<ObserverType>::NotifyWrapper,
- this, observer.first,
- NotificationData(this, from_here, method)));
- }
- }
- }
- for (ObserverType* observer : current_sequence_observers) {
- NotifyWrapper(observer, NotificationData(this, from_here, method));
- }
- }
- private:
- friend class RefCountedThreadSafe<ObserverListThreadSafeBase>;
- struct NotificationData : public NotificationDataBase {
- NotificationData(ObserverListThreadSafe* observer_list_in,
- const Location& from_here_in,
- const RepeatingCallback<void(ObserverType*)>& method_in)
- : NotificationDataBase(observer_list_in, from_here_in),
- method(method_in) {}
- RepeatingCallback<void(ObserverType*)> method;
- };
- ~ObserverListThreadSafe() override = default;
- void NotifyWrapper(ObserverType* observer,
- const NotificationData& notification) {
- {
- AutoLock auto_lock(lock_);
- // Check whether the observer still needs a notification.
- auto it = observers_.find(observer);
- if (it == observers_.end())
- return;
- DCHECK(it->second->RunsTasksInCurrentSequence());
- }
- // Keep track of the notification being dispatched on the current thread.
- // This will be used if the callback below calls AddObserver().
- //
- // Note: |tls_current_notification_| may not be nullptr if this runs in a
- // nested loop started by a notification callback. In that case, it is
- // important to save the previous value to restore it later.
- auto& tls_current_notification = tls_current_notification_.Get();
- const NotificationDataBase* const previous_notification =
- tls_current_notification.Get();
- tls_current_notification.Set(¬ification);
- // Invoke the callback.
- notification.method.Run(observer);
- // Reset the notification being dispatched on the current thread to its
- // previous value.
- tls_current_notification.Set(previous_notification);
- }
- const ObserverListPolicy policy_ = ObserverListPolicy::ALL;
- mutable Lock lock_;
- // Keys are observers. Values are the SequencedTaskRunners on which they must
- // be notified.
- std::unordered_map<ObserverType*, scoped_refptr<SequencedTaskRunner>>
- observers_ GUARDED_BY(lock_);
- };
- } // namespace base
- #endif // BASE_OBSERVER_LIST_THREADSAFE_H_
|