thread_group_impl.h 15 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372
  1. // Copyright 2016 The Chromium Authors. All rights reserved.
  2. // Use of this source code is governed by a BSD-style license that can be
  3. // found in the LICENSE file.
  4. #ifndef BASE_TASK_THREAD_POOL_THREAD_GROUP_IMPL_H_
  5. #define BASE_TASK_THREAD_POOL_THREAD_GROUP_IMPL_H_
  6. #include <stddef.h>
  7. #include <memory>
  8. #include <string>
  9. #include <vector>
  10. #include "base/base_export.h"
  11. #include "base/check_op.h"
  12. #include "base/compiler_specific.h"
  13. #include "base/containers/stack.h"
  14. #include "base/gtest_prod_util.h"
  15. #include "base/macros.h"
  16. #include "base/memory/ref_counted.h"
  17. #include "base/optional.h"
  18. #include "base/sequenced_task_runner.h"
  19. #include "base/strings/string_piece.h"
  20. #include "base/synchronization/condition_variable.h"
  21. #include "base/synchronization/waitable_event.h"
  22. #include "base/task/thread_pool/task.h"
  23. #include "base/task/thread_pool/task_source.h"
  24. #include "base/task/thread_pool/thread_group.h"
  25. #include "base/task/thread_pool/tracked_ref.h"
  26. #include "base/task/thread_pool/worker_thread.h"
  27. #include "base/task/thread_pool/worker_thread_stack.h"
  28. #include "base/time/time.h"
  29. namespace base {
  30. class HistogramBase;
  31. class WorkerThreadObserver;
  32. namespace internal {
  33. class TaskTracker;
  34. // A group of workers that run Tasks.
  35. //
  36. // The thread group doesn't create threads until Start() is called. Tasks can be
  37. // posted at any time but will not run until after Start() is called.
  38. //
  39. // This class is thread-safe.
  40. class BASE_EXPORT ThreadGroupImpl : public ThreadGroup {
  41. public:
  42. // Constructs a group without workers.
  43. //
  44. // |histogram_label| is used to label the thread group's histograms as
  45. // "ThreadPool." + histogram_name + "." + |histogram_label| + extra suffixes.
  46. // It must not be empty. |thread group_label| is used to label the thread
  47. // group's threads, it must not be empty. |priority_hint| is the preferred
  48. // thread priority; the actual thread priority depends on shutdown state and
  49. // platform capabilities. |task_tracker| keeps track of tasks.
  50. ThreadGroupImpl(StringPiece histogram_label,
  51. StringPiece thread_group_label,
  52. ThreadPriority priority_hint,
  53. TrackedRef<TaskTracker> task_tracker,
  54. TrackedRef<Delegate> delegate);
  55. // Creates threads, allowing existing and future tasks to run. The thread
  56. // group runs at most |max_tasks| / |max_best_effort_tasks| unblocked task
  57. // with any / BEST_EFFORT priority concurrently. It reclaims unused threads
  58. // after |suggested_reclaim_time|. It uses |service_thread_task_runner| to
  59. // monitor for blocked tasks. If specified, it notifies
  60. // |worker_thread_observer| when a worker enters and exits its main function
  61. // (the observer must not be destroyed before JoinForTesting() has returned).
  62. // |worker_environment| specifies the environment in which tasks are executed.
  63. // |may_block_threshold| is the timeout after which a task in a MAY_BLOCK
  64. // ScopedBlockingCall is considered blocked (the thread group will choose an
  65. // appropriate value if none is specified).
  66. // |synchronous_thread_start_for_testing| is true if this ThreadGroupImpl
  67. // should synchronously wait for OnMainEntry() after starting each worker. Can
  68. // only be called once. CHECKs on failure.
  69. void Start(int max_tasks,
  70. int max_best_effort_tasks,
  71. TimeDelta suggested_reclaim_time,
  72. scoped_refptr<SequencedTaskRunner> service_thread_task_runner,
  73. WorkerThreadObserver* worker_thread_observer,
  74. WorkerEnvironment worker_environment,
  75. bool synchronous_thread_start_for_testing = false,
  76. Optional<TimeDelta> may_block_threshold = Optional<TimeDelta>());
  77. // Destroying a ThreadGroupImpl returned by Create() is not allowed in
  78. // production; it is always leaked. In tests, it can only be destroyed after
  79. // JoinForTesting() has returned.
  80. ~ThreadGroupImpl() override;
  81. // ThreadGroup:
  82. void JoinForTesting() override;
  83. size_t GetMaxConcurrentNonBlockedTasksDeprecated() const override;
  84. void DidUpdateCanRunPolicy() override;
  85. const HistogramBase* num_tasks_before_detach_histogram() const {
  86. return num_tasks_before_detach_histogram_;
  87. }
  88. // Waits until at least |n| workers are idle. Note that while workers are
  89. // disallowed from cleaning up during this call: tests using a custom
  90. // |suggested_reclaim_time_| need to be careful to invoke this swiftly after
  91. // unblocking the waited upon workers as: if a worker is already detached by
  92. // the time this is invoked, it will never make it onto the idle stack and
  93. // this call will hang.
  94. void WaitForWorkersIdleForTesting(size_t n);
  95. // Waits until at least |n| workers are idle.
  96. void WaitForWorkersIdleLockRequiredForTesting(size_t n)
  97. EXCLUSIVE_LOCKS_REQUIRED(lock_);
  98. // Waits until all workers are idle.
  99. void WaitForAllWorkersIdleForTesting();
  100. // Waits until |n| workers have cleaned up (went through
  101. // WorkerThreadDelegateImpl::OnMainExit()) since the last call to
  102. // WaitForWorkersCleanedUpForTesting() (or Start() if that wasn't called yet).
  103. void WaitForWorkersCleanedUpForTesting(size_t n);
  104. // Returns the number of workers in this thread group.
  105. size_t NumberOfWorkersForTesting() const;
  106. // Returns |max_tasks_|/|max_best_effort_tasks_|.
  107. size_t GetMaxTasksForTesting() const;
  108. size_t GetMaxBestEffortTasksForTesting() const;
  109. // Returns the number of workers that are idle (i.e. not running tasks).
  110. size_t NumberOfIdleWorkersForTesting() const;
  111. private:
  112. class ScopedCommandsExecutor;
  113. class WorkerThreadDelegateImpl;
  114. // Friend tests so that they can access |blocked_workers_poll_period| and
  115. // may_block_threshold().
  116. friend class ThreadGroupImplBlockingTest;
  117. friend class ThreadGroupImplMayBlockTest;
  118. FRIEND_TEST_ALL_PREFIXES(ThreadGroupImplBlockingTest,
  119. ThreadBlockUnblockPremature);
  120. FRIEND_TEST_ALL_PREFIXES(ThreadGroupImplBlockingTest,
  121. ThreadBlockUnblockPrematureBestEffort);
  122. // ThreadGroup:
  123. void UpdateSortKey(TaskSource::Transaction transaction) override;
  124. void PushTaskSourceAndWakeUpWorkers(
  125. TransactionWithRegisteredTaskSource transaction_with_task_source)
  126. override;
  127. void EnsureEnoughWorkersLockRequired(BaseScopedCommandsExecutor* executor)
  128. override EXCLUSIVE_LOCKS_REQUIRED(lock_);
  129. // Creates a worker and schedules its start, if needed, to maintain one idle
  130. // worker, |max_tasks_| permitting.
  131. void MaintainAtLeastOneIdleWorkerLockRequired(
  132. ScopedCommandsExecutor* executor) EXCLUSIVE_LOCKS_REQUIRED(lock_);
  133. // Returns true if worker cleanup is permitted.
  134. bool CanWorkerCleanupForTestingLockRequired() EXCLUSIVE_LOCKS_REQUIRED(lock_);
  135. // Creates a worker, adds it to the thread group, schedules its start and
  136. // returns it. Cannot be called before Start().
  137. scoped_refptr<WorkerThread> CreateAndRegisterWorkerLockRequired(
  138. ScopedCommandsExecutor* executor) EXCLUSIVE_LOCKS_REQUIRED(lock_);
  139. // Returns the number of workers that are awake (i.e. not on the idle stack).
  140. size_t GetNumAwakeWorkersLockRequired() const EXCLUSIVE_LOCKS_REQUIRED(lock_);
  141. // Returns the desired number of awake workers, given current workload and
  142. // concurrency limits.
  143. size_t GetDesiredNumAwakeWorkersLockRequired() const
  144. EXCLUSIVE_LOCKS_REQUIRED(lock_);
  145. // Examines the list of WorkerThreads and increments |max_tasks_| for each
  146. // worker that has been within the scope of a MAY_BLOCK ScopedBlockingCall for
  147. // more than BlockedThreshold(). Reschedules a call if necessary.
  148. void AdjustMaxTasks();
  149. // Returns the threshold after which the max tasks is increased to compensate
  150. // for a worker that is within a MAY_BLOCK ScopedBlockingCall.
  151. TimeDelta may_block_threshold_for_testing() const {
  152. return after_start().may_block_threshold;
  153. }
  154. // Interval at which the service thread checks for workers in this thread
  155. // group that have been in a MAY_BLOCK ScopedBlockingCall for more than
  156. // may_block_threshold().
  157. TimeDelta blocked_workers_poll_period_for_testing() const {
  158. return after_start().blocked_workers_poll_period;
  159. }
  160. // Starts calling AdjustMaxTasks() periodically on
  161. // |service_thread_task_runner_|.
  162. void ScheduleAdjustMaxTasks();
  163. // Schedules AdjustMaxTasks() through |executor| if required.
  164. void MaybeScheduleAdjustMaxTasksLockRequired(ScopedCommandsExecutor* executor)
  165. EXCLUSIVE_LOCKS_REQUIRED(lock_);
  166. // Returns true if AdjustMaxTasks() should periodically be called on
  167. // |service_thread_task_runner_|.
  168. bool ShouldPeriodicallyAdjustMaxTasksLockRequired()
  169. EXCLUSIVE_LOCKS_REQUIRED(lock_);
  170. // Updates the minimum priority allowed to run below which tasks should yield.
  171. // This should be called whenever |num_running_tasks_| or |max_tasks| changes,
  172. // or when a new task is added to |priority_queue_|.
  173. void UpdateMinAllowedPriorityLockRequired() EXCLUSIVE_LOCKS_REQUIRED(lock_);
  174. // Increments/decrements the number of tasks of |priority| that are currently
  175. // running in this thread group. Must be invoked before/after running a task.
  176. void DecrementTasksRunningLockRequired(TaskPriority priority)
  177. EXCLUSIVE_LOCKS_REQUIRED(lock_);
  178. void IncrementTasksRunningLockRequired(TaskPriority priority)
  179. EXCLUSIVE_LOCKS_REQUIRED(lock_);
  180. // Increments/decrements the number of [best effort] tasks that can run in
  181. // this thread group.
  182. void DecrementMaxTasksLockRequired() EXCLUSIVE_LOCKS_REQUIRED(lock_);
  183. void IncrementMaxTasksLockRequired() EXCLUSIVE_LOCKS_REQUIRED(lock_);
  184. void DecrementMaxBestEffortTasksLockRequired()
  185. EXCLUSIVE_LOCKS_REQUIRED(lock_);
  186. void IncrementMaxBestEffortTasksLockRequired()
  187. EXCLUSIVE_LOCKS_REQUIRED(lock_);
  188. // Values set at Start() and never modified afterwards.
  189. struct InitializedInStart {
  190. InitializedInStart();
  191. ~InitializedInStart();
  192. #if DCHECK_IS_ON()
  193. // Set after all members of this struct are set.
  194. bool initialized = false;
  195. #endif
  196. // Initial value of |max_tasks_|.
  197. size_t initial_max_tasks = 0;
  198. // Suggested reclaim time for workers.
  199. TimeDelta suggested_reclaim_time;
  200. // Environment to be initialized per worker.
  201. WorkerEnvironment worker_environment = WorkerEnvironment::NONE;
  202. scoped_refptr<SequencedTaskRunner> service_thread_task_runner;
  203. // Optional observer notified when a worker enters and exits its main.
  204. WorkerThreadObserver* worker_thread_observer = nullptr;
  205. bool may_block_without_delay;
  206. // Threshold after which the max tasks is increased to compensate for a
  207. // worker that is within a MAY_BLOCK ScopedBlockingCall.
  208. TimeDelta may_block_threshold;
  209. // The period between calls to AdjustMaxTasks() when the thread group is at
  210. // capacity.
  211. TimeDelta blocked_workers_poll_period;
  212. } initialized_in_start_;
  213. InitializedInStart& in_start() {
  214. #if DCHECK_IS_ON()
  215. DCHECK(!initialized_in_start_.initialized);
  216. #endif
  217. return initialized_in_start_;
  218. }
  219. const InitializedInStart& after_start() const {
  220. #if DCHECK_IS_ON()
  221. DCHECK(initialized_in_start_.initialized);
  222. #endif
  223. return initialized_in_start_;
  224. }
  225. const std::string thread_group_label_;
  226. const ThreadPriority priority_hint_;
  227. // All workers owned by this thread group.
  228. std::vector<scoped_refptr<WorkerThread>> workers_ GUARDED_BY(lock_);
  229. // Maximum number of tasks of any priority / BEST_EFFORT priority that can run
  230. // concurrently in this thread group.
  231. size_t max_tasks_ GUARDED_BY(lock_) = 0;
  232. size_t max_best_effort_tasks_ GUARDED_BY(lock_) = 0;
  233. // Number of tasks of any priority / BEST_EFFORT priority that are currently
  234. // running in this thread group.
  235. size_t num_running_tasks_ GUARDED_BY(lock_) = 0;
  236. size_t num_running_best_effort_tasks_ GUARDED_BY(lock_) = 0;
  237. // Number of workers running a task of any priority / BEST_EFFORT priority
  238. // that are within the scope of a MAY_BLOCK ScopedBlockingCall but haven't
  239. // caused a max tasks increase yet.
  240. int num_unresolved_may_block_ GUARDED_BY(lock_) = 0;
  241. int num_unresolved_best_effort_may_block_ GUARDED_BY(lock_) = 0;
  242. // Stack of idle workers. Initially, all workers are on this stack. A worker
  243. // is removed from the stack before its WakeUp() function is called and when
  244. // it receives work from GetWork() (a worker calls GetWork() when its sleep
  245. // timeout expires, even if its WakeUp() method hasn't been called). A worker
  246. // is pushed on this stack when it receives nullptr from GetWork().
  247. WorkerThreadStack idle_workers_stack_ GUARDED_BY(lock_);
  248. // Signaled when a worker is added to the idle workers stack.
  249. std::unique_ptr<ConditionVariable> idle_workers_stack_cv_for_testing_
  250. GUARDED_BY(lock_);
  251. // Stack that contains the timestamps of when workers get cleaned up.
  252. // Timestamps get popped off the stack as new workers are added.
  253. base::stack<TimeTicks, std::vector<TimeTicks>> cleanup_timestamps_
  254. GUARDED_BY(lock_);
  255. // Whether an AdjustMaxTasks() task was posted to the service thread.
  256. bool adjust_max_tasks_posted_ GUARDED_BY(lock_) = false;
  257. // Indicates to the delegates that workers are not permitted to cleanup.
  258. bool worker_cleanup_disallowed_for_testing_ GUARDED_BY(lock_) = false;
  259. // Counts the number of workers cleaned up (went through
  260. // WorkerThreadDelegateImpl::OnMainExit()) since the last call to
  261. // WaitForWorkersCleanedUpForTesting() (or Start() if that wasn't called yet).
  262. // |some_workers_cleaned_up_for_testing_| is true if this was ever
  263. // incremented. Tests with a custom |suggested_reclaim_time_| can wait on a
  264. // specific number of workers being cleaned up via
  265. // WaitForWorkersCleanedUpForTesting().
  266. size_t num_workers_cleaned_up_for_testing_ GUARDED_BY(lock_) = 0;
  267. #if DCHECK_IS_ON()
  268. bool some_workers_cleaned_up_for_testing_ GUARDED_BY(lock_) = false;
  269. #endif
  270. // Signaled, if non-null, when |num_workers_cleaned_up_for_testing_| is
  271. // incremented.
  272. std::unique_ptr<ConditionVariable> num_workers_cleaned_up_for_testing_cv_
  273. GUARDED_BY(lock_);
  274. // Set at the start of JoinForTesting().
  275. bool join_for_testing_started_ GUARDED_BY(lock_) = false;
  276. // Null-opt unless |synchronous_thread_start_for_testing| was true at
  277. // construction. In that case, it's signaled each time
  278. // WorkerThreadDelegateImpl::OnMainEntry() completes.
  279. Optional<WaitableEvent> worker_started_for_testing_;
  280. // Cached HistogramBase pointers, can be accessed without
  281. // holding |lock_|. If |lock_| is held, add new samples using
  282. // ThreadGroupImpl::ScopedCommandsExecutor (increase
  283. // |scheduled_histogram_samples_| size as needed) to defer until after |lock_|
  284. // release, due to metrics system callbacks which may schedule tasks.
  285. // ThreadPool.DetachDuration.[thread group name] histogram. Intentionally
  286. // leaked.
  287. HistogramBase* const detach_duration_histogram_;
  288. // ThreadPool.NumTasksBeforeDetach.[thread group name] histogram.
  289. // Intentionally leaked.
  290. HistogramBase* const num_tasks_before_detach_histogram_;
  291. // Ensures recently cleaned up workers (ref.
  292. // WorkerThreadDelegateImpl::CleanupLockRequired()) had time to exit as
  293. // they have a raw reference to |this| (and to TaskTracker) which can
  294. // otherwise result in racy use-after-frees per no longer being part of
  295. // |workers_| and hence not being explicitly joined in JoinForTesting():
  296. // https://crbug.com/810464. Uses AtomicRefCount to make its only public
  297. // method thread-safe.
  298. TrackedRefFactory<ThreadGroupImpl> tracked_ref_factory_;
  299. DISALLOW_COPY_AND_ASSIGN(ThreadGroupImpl);
  300. };
  301. } // namespace internal
  302. } // namespace base
  303. #endif // BASE_TASK_THREAD_POOL_THREAD_GROUP_IMPL_H_