thread_group.h 10 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247
  1. // Copyright 2016 The Chromium Authors. All rights reserved.
  2. // Use of this source code is governed by a BSD-style license that can be
  3. // found in the LICENSE file.
  4. #ifndef BASE_TASK_THREAD_POOL_THREAD_GROUP_H_
  5. #define BASE_TASK_THREAD_POOL_THREAD_GROUP_H_
  6. #include "base/base_export.h"
  7. #include "base/memory/ref_counted.h"
  8. #include "base/task/common/checked_lock.h"
  9. #include "base/task/thread_pool/priority_queue.h"
  10. #include "base/task/thread_pool/task.h"
  11. #include "base/task/thread_pool/task_source.h"
  12. #include "base/task/thread_pool/tracked_ref.h"
  13. #include "build/build_config.h"
  14. #if defined(OS_WIN)
  15. #include "base/win/scoped_windows_thread_environment.h"
  16. #endif
  17. namespace base {
  18. namespace internal {
  19. class TaskTracker;
  20. // Interface and base implementation for a thread group. A thread group is a
  21. // subset of the threads in the thread pool (see GetThreadGroupForTraits() for
  22. // thread group selection logic when posting tasks and creating task runners).
  23. class BASE_EXPORT ThreadGroup {
  24. public:
  25. // Delegate interface for ThreadGroup.
  26. class BASE_EXPORT Delegate {
  27. public:
  28. virtual ~Delegate() = default;
  29. // Invoked when a TaskSource with |traits| is non-empty after the
  30. // ThreadGroup has run a task from it. The implementation must return the
  31. // thread group in which the TaskSource should be reenqueued.
  32. virtual ThreadGroup* GetThreadGroupForTraits(const TaskTraits& traits) = 0;
  33. };
  34. enum class WorkerEnvironment {
  35. // No special worker environment required.
  36. NONE,
  37. #if defined(OS_WIN)
  38. // Initialize a COM MTA on the worker.
  39. COM_MTA,
  40. // Initialize a COM STA on the worker.
  41. COM_STA,
  42. #endif // defined(OS_WIN)
  43. };
  44. virtual ~ThreadGroup();
  45. // Registers the thread group in TLS.
  46. void BindToCurrentThread();
  47. // Resets the thread group in TLS.
  48. void UnbindFromCurrentThread();
  49. // Returns true if the thread group is registered in TLS.
  50. bool IsBoundToCurrentThread() const;
  51. // Removes |task_source| from |priority_queue_|. Returns a
  52. // RegisteredTaskSource that evaluats to true if successful, or false if
  53. // |task_source| is not currently in |priority_queue_|, such as when a worker
  54. // is running a task from it.
  55. RegisteredTaskSource RemoveTaskSource(const TaskSource& task_source);
  56. // Updates the position of the TaskSource in |transaction| in this
  57. // ThreadGroup's PriorityQueue based on the TaskSource's current traits.
  58. //
  59. // Implementations should instantiate a concrete ScopedCommandsExecutor and
  60. // invoke UpdateSortKeyImpl().
  61. virtual void UpdateSortKey(TaskSource::Transaction transaction) = 0;
  62. // Pushes the TaskSource in |transaction_with_task_source| into this
  63. // ThreadGroup's PriorityQueue and wakes up workers as appropriate.
  64. //
  65. // Implementations should instantiate a concrete ScopedCommandsExecutor and
  66. // invoke PushTaskSourceAndWakeUpWorkersImpl().
  67. virtual void PushTaskSourceAndWakeUpWorkers(
  68. TransactionWithRegisteredTaskSource transaction_with_task_source) = 0;
  69. // Removes all task sources from this ThreadGroup's PriorityQueue and enqueues
  70. // them in another |destination_thread_group|. After this method is called,
  71. // any task sources posted to this ThreadGroup will be forwarded to
  72. // |destination_thread_group|.
  73. //
  74. // TODO(crbug.com/756547): Remove this method once the UseNativeThreadPool
  75. // experiment is complete.
  76. void InvalidateAndHandoffAllTaskSourcesToOtherThreadGroup(
  77. ThreadGroup* destination_thread_group);
  78. // Returns true if a task with |priority| running in this thread group should
  79. // return ASAP, either because this priority is not allowed to run or because
  80. // work of higher priority is pending. Thread-safe but may return an outdated
  81. // result (if a task unnecessarily yields due to this, it will simply be
  82. // re-scheduled).
  83. bool ShouldYield(TaskPriority priority) const;
  84. // Prevents new tasks from starting to run and waits for currently running
  85. // tasks to complete their execution. It is guaranteed that no thread will do
  86. // work on behalf of this ThreadGroup after this returns. It is
  87. // invalid to post a task once this is called. TaskTracker::Flush() can be
  88. // called before this to complete existing tasks, which might otherwise post a
  89. // task during JoinForTesting(). This can only be called once.
  90. virtual void JoinForTesting() = 0;
  91. // Returns the maximum number of non-blocked tasks that can run concurrently
  92. // in this ThreadGroup.
  93. //
  94. // TODO(fdoray): Remove this method. https://crbug.com/687264
  95. virtual size_t GetMaxConcurrentNonBlockedTasksDeprecated() const = 0;
  96. // Wakes up workers as appropriate for the new CanRunPolicy policy. Must be
  97. // called after an update to CanRunPolicy in TaskTracker.
  98. virtual void DidUpdateCanRunPolicy() = 0;
  99. protected:
  100. // Derived classes must implement a ScopedCommandsExecutor that derives from
  101. // this to perform operations at the end of a scope, when all locks have been
  102. // released.
  103. class BaseScopedCommandsExecutor {
  104. public:
  105. void ScheduleReleaseTaskSource(RegisteredTaskSource task_source);
  106. protected:
  107. BaseScopedCommandsExecutor();
  108. ~BaseScopedCommandsExecutor();
  109. private:
  110. std::vector<RegisteredTaskSource> task_sources_to_release_;
  111. DISALLOW_COPY_AND_ASSIGN(BaseScopedCommandsExecutor);
  112. };
  113. // Allows a task source to be pushed to a ThreadGroup's PriorityQueue at the
  114. // end of a scope, when all locks have been released.
  115. class ScopedReenqueueExecutor {
  116. public:
  117. ScopedReenqueueExecutor();
  118. ~ScopedReenqueueExecutor();
  119. // A TransactionWithRegisteredTaskSource and the ThreadGroup in which it
  120. // should be enqueued.
  121. void SchedulePushTaskSourceAndWakeUpWorkers(
  122. TransactionWithRegisteredTaskSource transaction_with_task_source,
  123. ThreadGroup* destination_thread_group);
  124. private:
  125. // A TransactionWithRegisteredTaskSource and the thread group in which it
  126. // should be enqueued.
  127. Optional<TransactionWithRegisteredTaskSource> transaction_with_task_source_;
  128. ThreadGroup* destination_thread_group_ = nullptr;
  129. DISALLOW_COPY_AND_ASSIGN(ScopedReenqueueExecutor);
  130. };
  131. // |predecessor_thread_group| is a ThreadGroup whose lock can be acquired
  132. // before the constructed ThreadGroup's lock. This is necessary to move all
  133. // task sources from |predecessor_thread_group| to the constructed ThreadGroup
  134. // and support the UseNativeThreadPool experiment.
  135. //
  136. // TODO(crbug.com/756547): Remove |predecessor_thread_group| once the
  137. // experiment is complete.
  138. ThreadGroup(TrackedRef<TaskTracker> task_tracker,
  139. TrackedRef<Delegate> delegate,
  140. ThreadGroup* predecessor_thread_group = nullptr);
  141. #if defined(OS_WIN)
  142. static std::unique_ptr<win::ScopedWindowsThreadEnvironment>
  143. GetScopedWindowsThreadEnvironment(WorkerEnvironment environment);
  144. #endif
  145. const TrackedRef<TaskTracker> task_tracker_;
  146. const TrackedRef<Delegate> delegate_;
  147. // Returns the number of workers required of workers to run all queued
  148. // BEST_EFFORT task sources allowed to run by the current CanRunPolicy.
  149. size_t GetNumAdditionalWorkersForBestEffortTaskSourcesLockRequired() const
  150. EXCLUSIVE_LOCKS_REQUIRED(lock_);
  151. // Returns the number of workers required to run all queued
  152. // USER_VISIBLE/USER_BLOCKING task sources allowed to run by the current
  153. // CanRunPolicy.
  154. size_t GetNumAdditionalWorkersForForegroundTaskSourcesLockRequired() const
  155. EXCLUSIVE_LOCKS_REQUIRED(lock_);
  156. // Ensures that there are enough workers to run queued task sources.
  157. // |executor| is forwarded from the one received in
  158. // PushTaskSourceAndWakeUpWorkersImpl()
  159. virtual void EnsureEnoughWorkersLockRequired(
  160. BaseScopedCommandsExecutor* executor) EXCLUSIVE_LOCKS_REQUIRED(lock_) = 0;
  161. // Reenqueues a |transaction_with_task_source| from which a Task just ran in
  162. // the current ThreadGroup into the appropriate ThreadGroup.
  163. void ReEnqueueTaskSourceLockRequired(
  164. BaseScopedCommandsExecutor* workers_executor,
  165. ScopedReenqueueExecutor* reenqueue_executor,
  166. TransactionWithRegisteredTaskSource transaction_with_task_source)
  167. EXCLUSIVE_LOCKS_REQUIRED(lock_);
  168. // Returns the next task source from |priority_queue_| if permitted to run and
  169. // pops |priority_queue_| if the task source returned no longer needs to be
  170. // queued (reached its maximum concurrency). Otherwise returns nullptr and
  171. // pops |priority_queue_| so this can be called again.
  172. RegisteredTaskSource TakeRegisteredTaskSource(
  173. BaseScopedCommandsExecutor* executor) EXCLUSIVE_LOCKS_REQUIRED(lock_);
  174. // Must be invoked by implementations of the corresponding non-Impl() methods.
  175. void UpdateSortKeyImpl(BaseScopedCommandsExecutor* executor,
  176. TaskSource::Transaction transaction);
  177. void PushTaskSourceAndWakeUpWorkersImpl(
  178. BaseScopedCommandsExecutor* executor,
  179. TransactionWithRegisteredTaskSource transaction_with_task_source);
  180. // Synchronizes accesses to all members of this class which are neither const,
  181. // atomic, nor immutable after start. Since this lock is a bottleneck to post
  182. // and schedule work, only simple data structure manipulations are allowed
  183. // within its scope (no thread creation or wake up).
  184. mutable CheckedLock lock_;
  185. // PriorityQueue from which all threads of this ThreadGroup get work.
  186. PriorityQueue priority_queue_ GUARDED_BY(lock_);
  187. // Minimum priority allowed to run below which tasks should yield. This is
  188. // expected to be always kept up-to-date by derived classes when |lock_| is
  189. // released. It is annotated as GUARDED_BY(lock_) because it is always updated
  190. // under the lock (to avoid races with other state during the update) but it
  191. // is nonetheless always safe to read it without the lock (since it's atomic).
  192. std::atomic<TaskPriority> min_allowed_priority_ GUARDED_BY(lock_){
  193. TaskPriority::BEST_EFFORT};
  194. // If |replacement_thread_group_| is non-null, this ThreadGroup is invalid and
  195. // all task sources should be scheduled on |replacement_thread_group_|. Used
  196. // to support the UseNativeThreadPool experiment.
  197. ThreadGroup* replacement_thread_group_ = nullptr;
  198. private:
  199. DISALLOW_COPY_AND_ASSIGN(ThreadGroup);
  200. };
  201. } // namespace internal
  202. } // namespace base
  203. #endif // BASE_TASK_THREAD_POOL_THREAD_GROUP_H_